This is an automated email from the ASF dual-hosted git repository.

pinal pushed a commit to branch atlas-2.5
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 547c31e48e951a625144309880bd22fee01ac7db
Author: prashantdev88 <[email protected]>
AuthorDate: Mon Sep 1 13:04:51 2025 +0530

    ATLAS-5069: Improve Unit Test Coverage for Impala-bridge Module (#422)
    
    (cherry picked from commit 18d7f9dccf5658988d32e387339948286810f0a8)
---
 .../apache/atlas/impala/ImpalaLineageToolTest.java | 436 +++++++++++++++
 .../impala/hook/AtlasImpalaHookContextTest.java    | 262 +++++++++
 .../impala/hook/ImpalaOperationParserTest.java     |  92 ++++
 .../impala/hook/events/BaseImpalaEventTest.java    | 601 +++++++++++++++++++++
 .../hook/events/CreateImpalaProcessTest.java       | 457 ++++++++++++++++
 5 files changed, 1848 insertions(+)

diff --git 
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolTest.java
 
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolTest.java
new file mode 100644
index 000000000..51ae1b002
--- /dev/null
+++ 
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolTest.java
@@ -0,0 +1,436 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+        * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+        * See the License for the specific language governing permissions and
+ * limitations under the License.
+**/
+package org.apache.atlas.impala;
+
+import org.apache.atlas.impala.hook.ImpalaLineageHook;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.*;
+        import static org.testng.Assert.*;
+
+public class ImpalaLineageToolTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ImpalaLineageToolTest.class);
+
+    @Mock
+    private ImpalaLineageHook mockImpalaLineageHook;
+
+    private Path tempDir;
+    private final PrintStream originalOut = System.out;
+    private ByteArrayOutputStream testOut;
+
+    @BeforeMethod
+    public void setUp() throws IOException {
+        MockitoAnnotations.initMocks(this);
+
+        tempDir = Files.createTempDirectory("impala-lineage-test");
+
+        testOut = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(testOut));
+    }
+
+    @AfterMethod
+    public void tearDown() throws IOException {
+
+        if (tempDir != null && Files.exists(tempDir)) {
+            Files.walk(tempDir)
+                    .sorted(Comparator.reverseOrder())
+                    .map(Path::toFile)
+                    .forEach(File::delete);
+        }
+
+        System.setOut(originalOut);
+    }
+
+    @Test
+    public void testConstructorWithValidArguments() {
+        String[] args = {"-d", "/test/directory", "-p", "test_prefix"};
+
+        ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+        assertNotNull(tool);
+    }
+
+    @Test(expectedExceptions = RuntimeException.class)
+    public void testConstructorWithInvalidArguments() {
+        String[] args = {"-invalid", "argument"};
+
+        new ImpalaLineageTool(args);
+    }
+
+    @Test
+    public void testGetCurrentFilesWithNoFiles() throws IOException {
+        String[] args = {"-d", tempDir.toString(), "-p", "nonexistent"};
+        ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+        File[] files = tool.getCurrentFiles();
+
+        assertNotNull(files);
+        assertEquals(files.length, 0);
+    }
+
+    @Test
+    public void testGetCurrentFilesWithSingleFile() throws IOException {
+
+        File testFile = new File(tempDir.toFile(), "test_lineage.log");
+        testFile.createNewFile();
+
+        String[] args = {"-d", tempDir.toString(), "-p", "test"};
+        ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+        File[] files = tool.getCurrentFiles();
+
+        assertNotNull(files);
+        assertEquals(files.length, 1);
+        assertEquals(files[0].getName(), "test_lineage.log");
+    }
+
+    @Test
+    public void testGetCurrentFilesWithMultipleFilesSorted() throws 
IOException {
+
+        File file1 = new File(tempDir.toFile(), "test_file1.log");
+        File file2 = new File(tempDir.toFile(), "test_file2.log");
+        File file3 = new File(tempDir.toFile(), "test_file3.log");
+
+        file1.createNewFile();
+        file2.createNewFile();
+        file3.createNewFile();
+
+
+        long baseTime = System.currentTimeMillis();
+        file1.setLastModified(baseTime - 3000); // 3 seconds ago
+        file2.setLastModified(baseTime - 2000); // 2 seconds ago
+        file3.setLastModified(baseTime - 1000); // 1 second ago
+
+        String[] args = {"-d", tempDir.toString(), "-p", "test"};
+        ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+        File[] files = tool.getCurrentFiles();
+
+        assertNotNull(files);
+        assertEquals(files.length, 3);
+
+        assertTrue(files[0].lastModified() <= files[1].lastModified());
+        assertTrue(files[1].lastModified() <= files[2].lastModified());
+    }
+
+    @Test
+    public void testGetCurrentFilesWithNonExistentDirectory() {
+        String[] args = {"-d", "/nonexistent/directory", "-p", "test"};
+        ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+        File[] files = tool.getCurrentFiles();
+
+        assertNotNull(files);
+        assertEquals(files.length, 0);
+    }
+
+    @Test
+    public void testDeleteLineageAndWalSuccess() throws IOException {
+        File lineageFile = new File(tempDir.toFile(), "test_lineage.log");
+        File walFile = new File(tempDir.toFile(), "test_wal.wal");
+
+        lineageFile.createNewFile();
+        walFile.createNewFile();
+
+        assertTrue(lineageFile.exists());
+        assertTrue(walFile.exists());
+
+        ImpalaLineageTool.deleteLineageAndWal(lineageFile, 
walFile.getAbsolutePath());
+
+        assertFalse(lineageFile.exists());
+        assertFalse(walFile.exists());
+    }
+
+    @Test
+    public void testDeleteLineageAndWalNonExistentFiles() {
+        File nonExistentFile = new File(tempDir.toFile(), "nonexistent.log");
+        String nonExistentWalPath = 
tempDir.resolve("nonexistent.wal").toString();
+
+        ImpalaLineageTool.deleteLineageAndWal(nonExistentFile, 
nonExistentWalPath);
+    }
+
+    @Test
+    public void testImportHImpalaEntitiesWithNewWalFile() throws Exception {
+
+        File lineageFile = new File(tempDir.toFile(), "test_lineage.log");
+        String testContent = "test lineage content";
+
+        try (FileWriter writer = new FileWriter(lineageFile)) {
+            writer.write(testContent);
+        }
+
+        String walPath = tempDir.resolve("test.wal").toString();
+
+        String[] args = {"-d", tempDir.toString(), "-p", "test"};
+        ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+
+        doNothing().when(mockImpalaLineageHook).process(anyString());
+
+        tool.importHImpalaEntities(mockImpalaLineageHook, 
lineageFile.getAbsolutePath(), walPath);
+
+        verify(mockImpalaLineageHook, times(1)).process(testContent);
+
+        File walFile = new File(walPath);
+        assertTrue(walFile.exists());
+    }
+
+    @Test
+    public void testImportHImpalaEntitiesWithExistingWalFile() throws 
Exception {
+
+        File lineageFile = new File(tempDir.toFile(), "test_lineage.log");
+        String testContent = "test lineage content";
+
+        try (FileWriter writer = new FileWriter(lineageFile)) {
+            writer.write(testContent);
+        }
+
+
+        String walPath = tempDir.resolve("test.wal").toString();
+        File walFile = new File(walPath);
+        try (FileWriter writer = new FileWriter(walFile)) {
+            writer.write("0, "+  lineageFile.getAbsolutePath());
+        }
+
+        String[] args = {"-d", tempDir.toString(), "-p", "test"};
+        ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+        doNothing().when(mockImpalaLineageHook).process(anyString());
+
+        tool.importHImpalaEntities(mockImpalaLineageHook, 
lineageFile.getAbsolutePath(), walPath);
+
+        verify(mockImpalaLineageHook, times(1)).process(testContent);
+    }
+
+    @Test
+    public void testImportHImpalaEntitiesWithProcessingFailure() throws 
Exception {
+
+        File lineageFile = new File(tempDir.toFile(), "test_lineage.log");
+        String testContent = "test lineage content";
+
+        try (FileWriter writer = new FileWriter(lineageFile)) {
+            writer.write(testContent);
+        }
+
+        String walPath = tempDir.resolve("test.wal").toString();
+
+        String[] args = {"-d", tempDir.toString(), "-p", "test"};
+        ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+        doThrow(new RuntimeException("Processing 
failed")).when(mockImpalaLineageHook).process(anyString());
+
+        tool.importHImpalaEntities(mockImpalaLineageHook, 
lineageFile.getAbsolutePath(), walPath);
+
+        verify(mockImpalaLineageHook, times(1)).process(testContent);
+
+        File walFile = new File(walPath);
+        assertTrue(walFile.exists());
+        String walContent = new String(Files.readAllBytes(walFile.toPath()));
+        assertEquals(walContent.trim(), "0, "+  lineageFile.getAbsolutePath());
+    }
+
+
+    @Test
+    public void testMainWithIncorrectNumberOfArguments() {
+        String[] args = {"-d", tempDir.toString()};
+
+        ImpalaLineageTool.main(args);
+
+        String output = testOut.toString();
+        assertTrue(output.contains("wrong number of arguments"));
+        assertTrue(output.contains("Usage: import-impala.sh"));
+    }
+
+
+    @Test
+    public void testRunWithMultipleFiles() throws IOException {
+        // Create multiple test files
+        File file1 = new File(tempDir.toFile(), "test_file1.log");
+        File file2 = new File(tempDir.toFile(), "test_file2.log");
+        File file3 = new File(tempDir.toFile(), "test_file3.log");
+
+        file1.createNewFile();
+        try (FileWriter writer = new FileWriter(file1)) {
+            writer.write("content1");
+        }
+
+        file2.createNewFile();
+        try (FileWriter writer = new FileWriter(file2)) {
+            writer.write("content2");
+        }
+
+        file3.createNewFile();
+        try (FileWriter writer = new FileWriter(file3)) {
+            writer.write("content3");
+        }
+
+        long baseTime = System.currentTimeMillis();
+        file1.setLastModified(baseTime - 3000);
+        file2.setLastModified(baseTime - 2000);
+        file3.setLastModified(baseTime - 1000);
+
+        String[] args = {"-d", tempDir.toString(), "-p", "test"};
+        ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+        try {
+            tool.run();
+        } catch (Exception e) {
+            LOG.info("Error running test case 
ImpalaLineageToolTest.testRunWithMultipleFiles()");
+        }
+
+        assertFalse(file1.exists());
+        assertFalse(file2.exists());
+        assertTrue(file3.exists());
+    }
+
+    @Test
+    public void testRunWithNoFiles() {
+        String[] args = {"-d", tempDir.toString(), "-p", "nonexistent"};
+        ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+        tool.run();
+    }
+
+    @Test
+    public void testRunWithSingleFile() throws IOException {
+        File testFile = new File(tempDir.toFile(), "test_single.log");
+        testFile.createNewFile();
+        try (FileWriter writer = new FileWriter(testFile)) {
+            writer.write("single file content");
+        }
+
+        String[] args = {"-d", tempDir.toString(), "-p", "test"};
+        ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+        try {
+            tool.run();
+        } catch (Exception e) {
+            LOG.info("Error running test case 
ImpalaLineageToolTest.testRunWithSingleFile()");
+        }
+
+        assertTrue(testFile.exists());
+    }
+
+    @Test
+    public void testProcessImpalaLineageHookSuccess() throws Exception {
+        String[] args = {"-d", tempDir.toString(), "-p", "test"};
+        ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+        java.lang.reflect.Method method = 
ImpalaLineageTool.class.getDeclaredMethod(
+                "processImpalaLineageHook",
+                ImpalaLineageHook.class,
+                java.util.List.class
+        );
+        method.setAccessible(true);
+
+        doNothing().when(mockImpalaLineageHook).process(anyString());
+
+        java.util.List<String> lineageList = java.util.Arrays.asList("query1", 
"query2", "query3");
+        boolean result = (Boolean) method.invoke(tool, mockImpalaLineageHook, 
lineageList);
+
+        assertTrue(result);
+        verify(mockImpalaLineageHook, times(3)).process(anyString());
+    }
+
+    @Test
+    public void testProcessImpalaLineageHookWithFailures() throws Exception {
+        String[] args = {"-d", tempDir.toString(), "-p", "test"};
+        ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+        // Use reflection to access the private method
+        java.lang.reflect.Method method = 
ImpalaLineageTool.class.getDeclaredMethod(
+                "processImpalaLineageHook",
+                ImpalaLineageHook.class,
+                java.util.List.class
+        );
+        method.setAccessible(true);
+
+        doNothing().when(mockImpalaLineageHook).process("query1");
+        doThrow(new RuntimeException("Processing 
failed")).when(mockImpalaLineageHook).process("query2");
+        doNothing().when(mockImpalaLineageHook).process("query3");
+
+        java.util.List<String> lineageList = java.util.Arrays.asList("query1", 
"query2", "query3");
+        boolean result = (Boolean) method.invoke(tool, mockImpalaLineageHook, 
lineageList);
+
+        assertFalse(result);
+        verify(mockImpalaLineageHook, times(3)).process(anyString());
+    }
+
+    @Test
+    public void testProcessImpalaLineageHookWithEmptyList() throws Exception {
+        String[] args = {"-d", tempDir.toString(), "-p", "test"};
+        ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+        java.lang.reflect.Method method = 
ImpalaLineageTool.class.getDeclaredMethod(
+                "processImpalaLineageHook",
+                ImpalaLineageHook.class,
+                java.util.List.class
+        );
+        method.setAccessible(true);
+
+        java.util.List<String> emptyList = java.util.Collections.emptyList();
+        boolean result = (Boolean) method.invoke(tool, mockImpalaLineageHook, 
emptyList);
+
+        assertTrue(result);
+        verify(mockImpalaLineageHook, never()).process(anyString());
+    }
+
+    @Test
+    public void testGetCurrentFilesWithMultipleFilesSortedByModificationTime() 
throws IOException, InterruptedException {
+        // Create multiple test files and ensure different modification times
+        File file1 = new File(tempDir.toFile(), "test_old.log");
+        file1.createNewFile();
+        file1.setLastModified(System.currentTimeMillis() - 3000); // 3 seconds 
ago
+
+        File file2 = new File(tempDir.toFile(), "test_medium.log");
+        file2.createNewFile();
+        file2.setLastModified(System.currentTimeMillis() - 2000); // 2 seconds 
ago
+
+        File file3 = new File(tempDir.toFile(), "test_new.log");
+        file3.createNewFile();
+        file3.setLastModified(System.currentTimeMillis() - 1000); // 1 second 
ago
+
+        String[] args = {"-d", tempDir.toString(), "-p", "test"};
+        ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+        File[] files = tool.getCurrentFiles();
+
+        assertNotNull(files);
+        assertEquals(files.length, 3);
+        assertEquals(files[0].getName(), "test_old.log");
+        assertEquals(files[1].getName(), "test_medium.log");
+        assertEquals(files[2].getName(), "test_new.log");
+    }
+
+}
diff --git 
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/AtlasImpalaHookContextTest.java
 
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/AtlasImpalaHookContextTest.java
new file mode 100644
index 000000000..386ff0108
--- /dev/null
+++ 
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/AtlasImpalaHookContextTest.java
@@ -0,0 +1,262 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.atlas.impala.hook;
+
+import org.apache.atlas.impala.model.ImpalaOperationType;
+import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.impala.model.LineageVertex;
+import org.apache.atlas.impala.model.LineageVertexMetadata;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertThrows;
+
+public class AtlasImpalaHookContextTest {
+
+
+    @Mock
+    private ImpalaLineageHook impalaLineageHook;
+
+    @Mock
+    private ImpalaOperationType impalaOperationType;
+
+    @Mock
+    private ImpalaQuery impalaQuery;
+
+
+    @BeforeMethod
+    public void initializeMocks() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+
+    @Test
+    public void testGetQualifiedNameForTableWithFullTableName() throws 
Exception {
+
+        String database = "testDatabase";
+        String table = "testTable";
+        String metadataNamespace = "testNamespace";
+        String expectedTableQualifiedName =  (database + 
AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME + table + 
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + 
metadataNamespace;
+        String fullTableName = 
database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table;
+
+        
when(impalaLineageHook.getMetadataNamespace()).thenReturn(metadataNamespace);
+
+        AtlasImpalaHookContext impalaHook = new 
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+        String receivedTableQualifiedName = 
impalaHook.getQualifiedNameForTable(fullTableName);
+        assertEquals(expectedTableQualifiedName,receivedTableQualifiedName);
+    }
+
+    @Test
+    public void testGetQualifiedNameForTableWithNullTableName() throws 
Exception {
+
+        String fullTableName = null;
+
+        AtlasImpalaHookContext impalaHook = new 
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+        assertThrows(IllegalArgumentException.class, () -> 
impalaHook.getQualifiedNameForTable(fullTableName));
+    }
+
+    @Test
+    public void testGetQualifiedNameForTableWithPartialTableName() throws 
Exception {
+
+        String tableName = "testTableName";
+
+        AtlasImpalaHookContext impalaHook = new 
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+        assertThrows(IllegalArgumentException.class, () -> 
impalaHook.getQualifiedNameForTable(tableName));
+    }
+
+    @Test
+    public void 
testGetQualifiedNameForColumnUsingLineageVertexAndLineageVertexMetadata() 
throws Exception {
+
+        String database = "testDatabase";
+        String table = "testTable";
+        String column = "testColumn";
+        String metadataNamespace = "testNamespace";
+        String fullTableName = 
database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table;
+        String fullColumnName = 
database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+column;
+        LineageVertex vertex = new LineageVertex();
+        LineageVertexMetadata metadata = new LineageVertexMetadata();
+
+        String expectedColumnQualifiedName =  (database + 
AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME  + table + 
AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME +
+                column + 
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + 
metadataNamespace;
+
+        metadata.setTableName(fullTableName);
+        vertex.setMetadata(metadata);
+        vertex.setVertexId(fullColumnName);
+
+        when(impalaLineageHook.getMetadataNamespace())
+                .thenReturn(metadataNamespace);
+
+        AtlasImpalaHookContext impalaHook = new 
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+        String receivedColumnQualifiedName = 
impalaHook.getQualifiedNameForColumn(vertex);
+        assertEquals(expectedColumnQualifiedName,receivedColumnQualifiedName);
+    }
+
+
+
+    @Test
+    public void 
testGetQualifiedNameForColumnUsingLineageVertexAndLineageVertexMetadataAsNull() 
throws Exception {
+
+        String database = "testDatabase";
+        String table = "testTable";
+        String column = "testColumn";
+        String metadataNamespace = "testNamespace";
+        String fullColumnName = 
database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+column;
+        LineageVertex vertex = new LineageVertex();
+
+        String expectedColumnQualifiedName =  (database + 
AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME  + table + 
AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME +
+                column + 
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + 
metadataNamespace;
+
+        vertex.setMetadata(null);
+        vertex.setVertexId(fullColumnName);
+
+        when(impalaLineageHook.getMetadataNamespace())
+                .thenReturn(metadataNamespace);
+
+        AtlasImpalaHookContext impalaHook = new 
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+        String receivedColumnQualifiedName = 
impalaHook.getQualifiedNameForColumn(vertex);
+        assertEquals(expectedColumnQualifiedName,receivedColumnQualifiedName);
+    }
+
+    @Test
+    public void 
testGetQualifiedNameForColumnUsingLineageVertexAndLineageVertexMetadataTableAsNull()
 throws Exception {
+
+        String database = "testDatabase";
+        String table = "testTable";
+        String column = "testColumn";
+        LineageVertex vertex = new LineageVertex();
+        LineageVertexMetadata metadata = new LineageVertexMetadata();
+
+        vertex.setMetadata(metadata);
+
+        AtlasImpalaHookContext impalaHook = new 
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+        assertThrows(IllegalArgumentException.class, () -> 
impalaHook.getQualifiedNameForColumn(vertex));
+    }
+
+    @Test
+    public void testGetQualifiedNameForColumn() throws Exception {
+
+        String database = "testDatabase";
+        String table = "testTable";
+        String column = "testColumn";
+        String metadataNamespace = "testNamespace";
+        String expectedColumnQualifiedName =     (database + 
AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME  + table + 
AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME +
+                column + 
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + 
metadataNamespace;
+        String fullColumnName = 
database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+column;
+
+        when(impalaLineageHook.getMetadataNamespace())
+                .thenReturn(metadataNamespace);
+
+
+        AtlasImpalaHookContext impalaHook = new 
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+        String receivedColumnQualifiedName = 
impalaHook.getQualifiedNameForColumn(fullColumnName);
+        assertEquals(expectedColumnQualifiedName,receivedColumnQualifiedName);
+    }
+
+
+    @Test
+    public void testGetTableNameFromColumn() throws Exception {
+
+        String table = "testTable";
+        String column = "testColumn";
+        String expectedTableName =  table;
+        String fullTableName = 
table+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+column;
+
+
+        AtlasImpalaHookContext impalaHook = new 
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+        String receivedTableName = 
impalaHook.getTableNameFromColumn(fullTableName);
+        assertEquals(expectedTableName,receivedTableName);
+    }
+
+    @Test
+    public void testGetDatabaseNameFromTable() throws Exception {
+
+        String table = "testTable";
+        String database = "testDatabase";
+        String expectedDatabaseName =  database;
+        String fullTableName = 
database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table;
+
+
+        AtlasImpalaHookContext impalaHook = new 
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+        String receivedDatabaseName = 
impalaHook.getDatabaseNameFromTable(fullTableName);
+        assertEquals(expectedDatabaseName,receivedDatabaseName);
+    }
+
+
+    @Test
+    public void testGetColumnNameOnlyWithFullCOlumnName() throws Exception {
+
+        String table = "testTable";
+        String column = "testColumn";
+        String expectedColumnName =  column;
+        String fullColumnName = 
table+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+column;
+
+        AtlasImpalaHookContext impalaHook = new 
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+        String receivedColumnName = 
impalaHook.getColumnNameOnly(fullColumnName);
+        assertEquals(expectedColumnName,receivedColumnName);
+    }
+
+
+    @Test
+    public void testGetColumnNameOnlyWithNullValue() throws Exception {
+        String fullColumnName = null;
+        AtlasImpalaHookContext impalaHook = new 
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+        assertThrows(IllegalArgumentException.class, () -> 
impalaHook.getColumnNameOnly(fullColumnName));
+    }
+
+    @Test
+    public void testGetColumnNameOnlyWithPartialColumnName() throws Exception {
+
+        String table = "testTable";
+        String column = "testColumn";
+        String expectedColumnName =  column;
+        String columnName = column;
+
+        AtlasImpalaHookContext impalaHook = new 
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+        String receivedColumnName = impalaHook.getColumnNameOnly(columnName);
+        assertEquals(expectedColumnName,receivedColumnName);
+    }
+
+    @Test
+    public void testGetQualifiedNameForDb() throws Exception {
+
+        String database = "testDatabase";
+        String metadataNamespace = "testNamespace";
+        String expectedDatabaseName =  (database + 
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + 
metadataNamespace;
+        AtlasImpalaHookContext impalaHook = new 
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+        
when(impalaLineageHook.getMetadataNamespace()).thenReturn(metadataNamespace);
+
+        String receivedDatabaseName = 
impalaHook.getQualifiedNameForDb(database);
+        assertEquals(expectedDatabaseName,receivedDatabaseName);
+    }
+
+
+}
diff --git 
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaOperationParserTest.java
 
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaOperationParserTest.java
new file mode 100644
index 000000000..f1662f7c8
--- /dev/null
+++ 
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaOperationParserTest.java
@@ -0,0 +1,92 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.atlas.impala.hook;
+
+import org.apache.atlas.impala.model.ImpalaOperationType;
+import org.testng.annotations.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ImpalaOperationParserTest {
+
+    @Test
+    public void testGetImpalaOperationTypeCREATEVIEW() {
+        String viewQuery = "create view test_view as select * from test_table";
+        ImpalaOperationType operationType = 
ImpalaOperationParser.getImpalaOperationType(viewQuery);
+        assertEquals(ImpalaOperationType.CREATEVIEW,operationType);
+    }
+
+    @Test
+    public void testGetImpalaOperationTypeCREATETABLE_AS_SELECT() {
+        String selectAsQuery = "create table test_table as select * from 
test_table_1";
+        ImpalaOperationType operationType = 
ImpalaOperationParser.getImpalaOperationType(selectAsQuery);
+        assertEquals(ImpalaOperationType.CREATETABLE_AS_SELECT,operationType);
+    }
+
+    @Test
+    public void testGetImpalaOperationTypeALTERVIEW_AS() {
+        String alterViewQuery = "ALTER VIEW test_view AS SELECT * FROM 
test_table_1;";
+        ImpalaOperationType operationType = 
ImpalaOperationParser.getImpalaOperationType(alterViewQuery);
+        assertEquals(ImpalaOperationType.ALTERVIEW_AS,operationType);
+    }
+
+    @Test
+    public void testGetImpalaOperationTypeQUERY() {
+        String query = "INSERT INTO test_table SELECT * FROM 
test_source_table;";
+        ImpalaOperationType operationType = 
ImpalaOperationParser.getImpalaOperationType(query);
+        assertEquals(ImpalaOperationType.QUERY,operationType);
+    }
+
+
+    @Test
+    public void testGetImpalaOperationTypeQUERY_WITH_CLAUSE() {
+        String queryWithClause = "WITH test_table_2 AS (SELECT id FROM 
test_table)INSERT INTO test_table_1 SELECT id FROM test_table_2";
+        ImpalaOperationType operationType = 
ImpalaOperationParser.getImpalaOperationType(queryWithClause);
+        assertEquals(ImpalaOperationType.QUERY_WITH_CLAUSE,operationType);
+    }
+
+    @Test
+    public void testGetImpalaOperationTypeUNKNOWN() {
+        String unknowQuery = "SELECT * from test_table";
+        ImpalaOperationType operationType = 
ImpalaOperationParser.getImpalaOperationType(unknowQuery);
+        assertEquals(ImpalaOperationType.UNKNOWN,operationType);
+    }
+
+    @Test
+    public void testGetImpalaOperationSubTypeWithInvalidQuery() {
+        String unknowQuery = "SELECT * from test_table";
+        ImpalaOperationType operationType = 
ImpalaOperationParser.getImpalaOperationSubType(ImpalaOperationType.QUERY_WITH_CLAUSE,unknowQuery);
+        assertEquals(ImpalaOperationType.UNKNOWN,operationType);
+    }
+
+    @Test
+    public void testGetImpalaOperationSubTypeINSERT() {
+        String query = "INSERT INTO test_table SELECT * FROM 
test_source_table;";
+        ImpalaOperationType operationType = 
ImpalaOperationParser.getImpalaOperationSubType(ImpalaOperationType.QUERY,query);
+        assertEquals(ImpalaOperationType.INSERT,operationType);
+    }
+
+    @Test
+    public void testGetImpalaOperationSubTypeINSERT_OVERWRITE() {
+        String query = "INSERT OVERWRITE TABLE test_table\n" +
+                "SELECT region, SUM(amount) AS test_table_1\n" +
+                "FROM test_table_2\n" +
+                "GROUP BY region;\n";
+        ImpalaOperationType operationType = 
ImpalaOperationParser.getImpalaOperationSubType(ImpalaOperationType.QUERY,query);
+        assertEquals(ImpalaOperationType.INSERT_OVERWRITE,operationType);
+    }
+}
diff --git 
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/BaseImpalaEventTest.java
 
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/BaseImpalaEventTest.java
new file mode 100644
index 000000000..0d5c4fc91
--- /dev/null
+++ 
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/BaseImpalaEventTest.java
@@ -0,0 +1,601 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.atlas.impala.hook.events;
+
+import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
+import org.apache.atlas.impala.model.ImpalaDataType;
+import org.apache.atlas.impala.model.ImpalaNode;
+import org.apache.atlas.impala.model.ImpalaOperationType;
+import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.impala.model.ImpalaVertexType;
+import org.apache.atlas.impala.model.LineageVertex;
+import org.apache.atlas.impala.model.LineageVertexMetadata;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.notification.HookNotification;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+public class BaseImpalaEventTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BaseImpalaEventTest.class);
+
+    @Mock
+    private AtlasImpalaHookContext mockContext;
+
+    @Mock
+    private ImpalaQuery mockQuery;
+
+    private static final String CLUSTER_NAME = "test_cluster";
+    private static final String DB_NAME = "test_database";
+    private static final String TABLE_NAME = "test_table";
+    private static final String COLUMN_NAME = "test_column";
+    private static final String USER_NAME = "test_user";
+    private static final String HOST_NAME = "test_host";
+    private static final String QUERY_TEXT = "SELECT * FROM test_table";
+    private static final String QUERY_ID = "test_query_123";
+    private static final long TIMESTAMP = 1554750072L;
+    private static final long END_TIME = 1554750554L;
+
+    private static class TestBaseImpalaEvent extends BaseImpalaEvent {
+        public TestBaseImpalaEvent(AtlasImpalaHookContext context) {
+            super(context);
+        }
+
+        @Override
+        public List<HookNotification> getNotificationMessages() throws 
Exception {
+            return Collections.emptyList();
+        }
+    }
+
+    private TestBaseImpalaEvent baseImpalaEvent;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+
+        when(mockContext.getUserName()).thenReturn(USER_NAME);
+        when(mockContext.getHostName()).thenReturn(HOST_NAME);
+        when(mockContext.getMetadataNamespace()).thenReturn(CLUSTER_NAME);
+        when(mockContext.getQueryStr()).thenReturn(QUERY_TEXT);
+        
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.QUERY);
+        when(mockContext.getLineageQuery()).thenReturn(mockQuery);
+
+        when(mockQuery.getQueryId()).thenReturn(QUERY_ID);
+        when(mockQuery.getTimestamp()).thenReturn(TIMESTAMP);
+        when(mockQuery.getEndTime()).thenReturn(END_TIME);
+
+        setupQualifiedNameMocks();
+
+        baseImpalaEvent = new TestBaseImpalaEvent(mockContext);
+    }
+
+    private void setupQualifiedNameMocks() {
+        
when(mockContext.getQualifiedNameForDb(anyString())).thenAnswer(invocation -> {
+            String dbName = invocation.getArgument(0);
+            return dbName.toLowerCase()  +"@"+  CLUSTER_NAME;
+        });
+
+        
when(mockContext.getQualifiedNameForTable(anyString())).thenAnswer(invocation 
-> {
+            String tableQualifiedName = invocation.getArgument(0);
+            return tableQualifiedName.toLowerCase()  +"@"+  CLUSTER_NAME;
+        });
+
+        
when(mockContext.getQualifiedNameForColumn(any(LineageVertex.class))).thenAnswer(invocation
 -> {
+            LineageVertex vertex = invocation.getArgument(0);
+            return vertex.getVertexId().toLowerCase()  +"@"+  CLUSTER_NAME;
+        });
+
+        
when(mockContext.getDatabaseNameFromTable(anyString())).thenAnswer(invocation 
-> {
+            String tableName = invocation.getArgument(0);
+            if (tableName.contains(".")) {
+                return tableName.split("\\.")[0];
+            }
+            return DB_NAME;
+        });
+
+        when(mockContext.getColumnNameOnly(anyString())).thenAnswer(invocation 
-> {
+            String columnName = invocation.getArgument(0);
+            if (columnName.contains(".")) {
+                String[] parts = columnName.split("\\.");
+                return parts[parts.length - 1];
+            }
+            return columnName;
+        });
+
+        
when(mockContext.getTableNameFromColumn(anyString())).thenAnswer(invocation -> {
+            String columnName = invocation.getArgument(0);
+            if (columnName.contains(".") && columnName.split("\\.").length >= 
2) {
+                String[] parts = columnName.split("\\.");
+                return parts[0]  +"."+  parts[1];
+            }
+            return DB_NAME  +"."+  TABLE_NAME;
+        });
+    }
+
+    @Test
+    public void testGetUserName() {
+        assertEquals(baseImpalaEvent.getUserName(), USER_NAME);
+    }
+
+    @Test
+    public void testGetQualifiedNameWithImpalaNode() throws Exception {
+        // Create a test ImpalaNode with database vertex
+        LineageVertex dbVertex = createDatabaseVertex(DB_NAME);
+        ImpalaNode dbNode = new ImpalaNode(dbVertex);
+
+        String qualifiedName = baseImpalaEvent.getQualifiedName(dbNode);
+
+        assertNotNull(qualifiedName);
+        assertEquals(qualifiedName, DB_NAME.toLowerCase()  +"@"+  
CLUSTER_NAME);
+    }
+
+    @Test
+    public void testGetQualifiedNameWithLineageVertex() throws Exception {
+        // Test database vertex
+        LineageVertex dbVertex = createDatabaseVertex(DB_NAME);
+        String dbQualifiedName = baseImpalaEvent.getQualifiedName(dbVertex);
+        assertEquals(dbQualifiedName, DB_NAME.toLowerCase()  +"@"+  
CLUSTER_NAME);
+
+        // Test table vertex
+        LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME);
+        String tableQualifiedName = 
baseImpalaEvent.getQualifiedName(tableVertex);
+        assertEquals(tableQualifiedName, DB_NAME+"."+TABLE_NAME.toLowerCase()  
+"@"+  CLUSTER_NAME);
+
+        // Test column vertex
+        LineageVertex columnVertex = createColumnVertex(COLUMN_NAME, 
TABLE_NAME);
+        String columnQualifiedName = 
baseImpalaEvent.getQualifiedName(columnVertex);
+        System.out.println("columnQualifiedName : "+columnQualifiedName);
+        assertEquals(columnQualifiedName, 
DB_NAME+"."+TABLE_NAME+"."+COLUMN_NAME.toLowerCase()  +"@"+  CLUSTER_NAME);
+    }
+
+    @Test
+    public void testGetQualifiedNameWithNullVertex() {
+        try {
+            baseImpalaEvent.getQualifiedName((LineageVertex) null);
+            fail("Expected IllegalArgumentException for null vertex");
+        } catch (IllegalArgumentException e) {
+            assertEquals(e.getMessage(), "node is null");
+        }
+    }
+
+    @Test
+    public void testGetQualifiedNameWithNullVertexType() {
+        LineageVertex vertex = new LineageVertex();
+        vertex.setVertexId("test");
+        vertex.setVertexType(null);
+
+        String qualifiedName = baseImpalaEvent.getQualifiedName(vertex);
+        assertNull(qualifiedName);
+    }
+
+    @Test
+    public void testGetQualifiedNameWithNullVertexId() {
+        LineageVertex vertex = new LineageVertex();
+        vertex.setVertexType(ImpalaVertexType.DATABASE);
+        vertex.setVertexId(null);
+
+        String qualifiedName = baseImpalaEvent.getQualifiedName(vertex);
+        assertNull(qualifiedName);
+    }
+
+    @Test
+    public void testGetTableNameFromVertex() {
+        // Test with column vertex that has metadata
+        LineageVertex columnVertex = createColumnVertex(COLUMN_NAME, 
TABLE_NAME);
+        LineageVertexMetadata metadata = new LineageVertexMetadata();
+        metadata.setTableName(TABLE_NAME);
+        columnVertex.setMetadata(metadata);
+
+        String tableName = 
baseImpalaEvent.getTableNameFromVertex(columnVertex);
+        assertEquals(tableName, TABLE_NAME);
+
+        // Test with non-column vertex
+        LineageVertex dbVertex = createDatabaseVertex(DB_NAME);
+        String tableNameFromDb = 
baseImpalaEvent.getTableNameFromVertex(dbVertex);
+        assertEquals(tableNameFromDb, DB_NAME  +"."+  TABLE_NAME);
+    }
+
+    @Test
+    public void testGetTableNameFromColumn() {
+        String columnName = DB_NAME  +"."+  TABLE_NAME  +"."+  COLUMN_NAME;
+        String tableName = baseImpalaEvent.getTableNameFromColumn(columnName);
+        assertEquals(tableName, DB_NAME  +"."+  TABLE_NAME);
+    }
+
+    @Test
+    public void testToDbEntityWithString() throws Exception {
+
+        when(mockContext.getEntity(anyString())).thenReturn(null);
+
+        AtlasEntity dbEntity = baseImpalaEvent.toDbEntity(DB_NAME);
+
+        assertNotNull(dbEntity);
+        assertEquals(dbEntity.getTypeName(), HIVE_TYPE_DB);
+        assertEquals(dbEntity.getAttribute(ATTRIBUTE_NAME), 
DB_NAME.toLowerCase());
+        assertEquals(dbEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), 
DB_NAME.toLowerCase()  +"@"+  CLUSTER_NAME);
+        assertEquals(dbEntity.getAttribute(ATTRIBUTE_CLUSTER_NAME), 
CLUSTER_NAME);
+        assertNull(dbEntity.getGuid());
+    }
+
+
+    @Test
+    public void testGetColumnEntities() throws Exception {
+
+        LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME);
+        tableVertex.setId(1L);
+        tableVertex.setCreateTime(TIMESTAMP);
+        ImpalaNode tableNode = new ImpalaNode(tableVertex);
+        tableNode.addChild(tableVertex);
+        AtlasEntity entity = new AtlasEntity(HIVE_TYPE_COLUMN);
+        entity.setGuid("test-guid");
+        entity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, "test.qualified.name");
+
+        AtlasObjectId objectId = BaseImpalaEvent.getObjectId(entity);
+
+        when(mockContext.getEntity(anyString())).thenReturn(null);
+        when(mockContext.getUserName()).thenReturn(USER_NAME);
+
+        List<AtlasEntity> entityList = 
baseImpalaEvent.getColumnEntities(objectId ,tableNode);
+
+        assertNotNull(entityList.get(0));
+        assertEquals(entityList.get(0).getTypeName(), HIVE_TYPE_COLUMN);
+        assertEquals(entityList.get(0).getAttribute(ATTRIBUTE_TABLE),objectId);
+    }
+
+    @Test
+    public void testToTableEntity() throws Exception {
+
+        LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME);
+        tableVertex.setId(1L);
+        tableVertex.setCreateTime(TIMESTAMP);
+        ImpalaNode tableNode = new ImpalaNode(tableVertex);
+        tableNode.addChild(tableVertex);
+        AtlasEntity entity = new AtlasEntity(HIVE_TYPE_TABLE);
+        entity.setGuid("test-guid");
+
+        AtlasObjectId objectId = BaseImpalaEvent.getObjectId(entity);
+
+        when(mockContext.getEntity(anyString())).thenReturn(null);
+        when(mockContext.getUserName()).thenReturn(USER_NAME);
+
+        AtlasEntity atlasEntity = baseImpalaEvent.toTableEntity(objectId 
,tableNode,null);
+
+
+        assertEquals(atlasEntity.getTypeName(),HIVE_TYPE_TABLE);
+        
assertEquals(atlasEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME),DB_NAME+"."+TABLE_NAME+"@"+CLUSTER_NAME);
+    }
+
+
+
+    @Test
+    public void testToDbEntityWithCachedEntity() throws Exception {
+        // Setup entity cache to return existing entity
+        AtlasEntity cachedEntity = new AtlasEntity(HIVE_TYPE_DB);
+        cachedEntity.setAttribute(ATTRIBUTE_NAME, DB_NAME);
+        when(mockContext.getEntity(anyString())).thenReturn(cachedEntity);
+
+        AtlasEntity dbEntity = baseImpalaEvent.toDbEntity(DB_NAME);
+
+        assertNotNull(dbEntity);
+        assertEquals(dbEntity, cachedEntity);
+    }
+
+
+    @Test
+    public void testToTableEntityWithNullNode() {
+        try {
+            baseImpalaEvent.toTableEntity((ImpalaNode) null, 
(AtlasEntitiesWithExtInfo) null);
+            fail("Expected IllegalArgumentException for null table node");
+        } catch (Exception e) {
+            assertTrue(e instanceof IllegalArgumentException);
+            assertTrue(e.getMessage().contains("table is null"));
+        }
+    }
+
+    @Test
+    public void testGetObjectId() {
+        AtlasEntity entity = new AtlasEntity(HIVE_TYPE_TABLE);
+        entity.setGuid("test-guid");
+        entity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, "test.qualified.name");
+
+        AtlasObjectId objectId = BaseImpalaEvent.getObjectId(entity);
+
+        assertNotNull(objectId);
+        assertEquals(objectId.getGuid(), "test-guid");
+        assertEquals(objectId.getTypeName(), HIVE_TYPE_TABLE);
+        
assertEquals(objectId.getUniqueAttributes().get(ATTRIBUTE_QUALIFIED_NAME), 
"test.qualified.name");
+    }
+
+    @Test
+    public void testGetObjectIds() {
+        List<AtlasEntity> entities = new ArrayList<>();
+
+        AtlasEntity entity1 = new AtlasEntity(HIVE_TYPE_TABLE);
+        entity1.setGuid("test-guid-1");
+        entity1.setAttribute(ATTRIBUTE_QUALIFIED_NAME, "test.table1");
+        entities.add(entity1);
+
+        AtlasEntity entity2 = new AtlasEntity(HIVE_TYPE_COLUMN);
+        entity2.setGuid("test-guid-2");
+        entity2.setAttribute(ATTRIBUTE_QUALIFIED_NAME, "test.column1");
+        entities.add(entity2);
+
+        List<AtlasObjectId> objectIds = BaseImpalaEvent.getObjectIds(entities);
+
+        assertNotNull(objectIds);
+        assertEquals(objectIds.size(), 2);
+        assertEquals(objectIds.get(0).getGuid(), "test-guid-1");
+        assertEquals(objectIds.get(1).getGuid(), "test-guid-2");
+    }
+
+    @Test
+    public void testGetObjectIdsWithEmptyList() {
+        List<AtlasObjectId> objectIds = 
BaseImpalaEvent.getObjectIds(Collections.emptyList());
+        assertNotNull(objectIds);
+        assertTrue(objectIds.isEmpty());
+    }
+
+    @Test
+    public void testGetTableCreateTimeWithNode() {
+        LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME);
+        tableVertex.setCreateTime(TIMESTAMP);
+        ImpalaNode tableNode = new ImpalaNode(tableVertex);
+
+        long createTime = BaseImpalaEvent.getTableCreateTime(tableNode);
+        assertEquals(createTime, TIMESTAMP * MILLIS_CONVERT_FACTOR);
+    }
+
+    @Test
+    public void testGetTableCreateTimeWithVertex() {
+        LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME);
+        tableVertex.setCreateTime(TIMESTAMP);
+
+        long createTime = BaseImpalaEvent.getTableCreateTime(tableVertex);
+        assertEquals(createTime, TIMESTAMP * MILLIS_CONVERT_FACTOR);
+    }
+
+    @Test
+    public void testGetTableCreateTimeWithNullTime() {
+        LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME);
+        tableVertex.setCreateTime(null);
+
+        long createTime = BaseImpalaEvent.getTableCreateTime(tableVertex);
+        assertTrue(createTime > 0);
+    }
+
+    @Test
+    public void testGetImpalaProcessEntity() throws Exception {
+        List<AtlasEntity> inputs = createMockEntities("input_table");
+        List<AtlasEntity> outputs = createMockEntities("output_table");
+
+        AtlasEntity processEntity = 
baseImpalaEvent.getImpalaProcessEntity(inputs, outputs);
+
+        assertNotNull(processEntity);
+        assertEquals(processEntity.getTypeName(), 
ImpalaDataType.IMPALA_PROCESS.getName());
+        assertNotNull(processEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+        assertNotNull(processEntity.getAttribute(ATTRIBUTE_INPUTS));
+        assertNotNull(processEntity.getAttribute(ATTRIBUTE_OUTPUTS));
+        assertEquals(processEntity.getAttribute(ATTRIBUTE_OPERATION_TYPE), 
ImpalaOperationType.QUERY);
+        assertEquals(processEntity.getAttribute(ATTRIBUTE_START_TIME), 
TIMESTAMP * MILLIS_CONVERT_FACTOR);
+        assertEquals(processEntity.getAttribute(ATTRIBUTE_END_TIME), END_TIME 
* MILLIS_CONVERT_FACTOR);
+        assertEquals(processEntity.getAttribute(ATTRIBUTE_USER_NAME), 
EMPTY_ATTRIBUTE_VALUE);
+        assertEquals(processEntity.getAttribute(ATTRIBUTE_QUERY_TEXT), 
EMPTY_ATTRIBUTE_VALUE);
+    }
+
+    @Test
+    public void testGetImpalaProcessExecutionEntity() throws Exception {
+        List<AtlasEntity> inputs = createMockEntities("input_table");
+        List<AtlasEntity> outputs = createMockEntities("output_table");
+        AtlasEntity processEntity = 
baseImpalaEvent.getImpalaProcessEntity(inputs, outputs);
+
+        AtlasEntity executionEntity = 
baseImpalaEvent.getImpalaProcessExecutionEntity(processEntity);
+
+        assertNotNull(executionEntity);
+        assertEquals(executionEntity.getTypeName(), 
ImpalaDataType.IMPALA_PROCESS_EXECUTION.getName());
+        assertNotNull(executionEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+        assertEquals(executionEntity.getAttribute(ATTRIBUTE_START_TIME), 
TIMESTAMP * MILLIS_CONVERT_FACTOR);
+        assertEquals(executionEntity.getAttribute(ATTRIBUTE_END_TIME), 
END_TIME * MILLIS_CONVERT_FACTOR);
+        assertEquals(executionEntity.getAttribute(ATTRIBUTE_USER_NAME), 
USER_NAME);
+        assertEquals(executionEntity.getAttribute(ATTRIBUTE_QUERY_TEXT), 
QUERY_TEXT.toLowerCase().trim());
+        assertEquals(executionEntity.getAttribute(ATTRIBUTE_QUERY_ID), 
QUERY_ID);
+        assertEquals(executionEntity.getAttribute(ATTRIBUTE_HOSTNAME), 
HOST_NAME);
+    }
+
+    @Test
+    public void testCreateTableNode() {
+        Long createTime = TIMESTAMP;
+        ImpalaNode tableNode = baseImpalaEvent.createTableNode(TABLE_NAME, 
createTime);
+
+        assertNotNull(tableNode);
+        assertEquals(tableNode.getOwnVertex().getVertexType(), 
ImpalaVertexType.TABLE);
+        assertEquals(tableNode.getOwnVertex().getVertexId(), TABLE_NAME);
+        assertEquals(tableNode.getOwnVertex().getCreateTime(), createTime);
+    }
+
+    @Test
+    public void testCreateHiveDDLEntityWithDb() {
+        AtlasEntity dbEntity = new AtlasEntity(HIVE_TYPE_DB);
+        dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
"test.db.qualified.name");
+
+        AtlasEntity ddlEntity = baseImpalaEvent.createHiveDDLEntity(dbEntity);
+
+        assertNotNull(ddlEntity);
+        assertEquals(ddlEntity.getTypeName(), 
ImpalaDataType.HIVE_DB_DDL.getName());
+        assertEquals(ddlEntity.getAttribute(ATTRIBUTE_SERVICE_TYPE), "impala");
+        assertEquals(ddlEntity.getAttribute(ATTRIBUTE_EXEC_TIME), TIMESTAMP * 
MILLIS_CONVERT_FACTOR);
+        assertEquals(ddlEntity.getAttribute(ATTRIBUTE_QUERY_TEXT), QUERY_TEXT);
+        assertEquals(ddlEntity.getAttribute(ATTRIBUTE_USER_NAME), USER_NAME);
+        assertNotNull(ddlEntity.getAttribute(ATTRIBUTE_DB));
+    }
+
+    @Test
+    public void testCreateHiveDDLEntityWithTable() {
+        AtlasEntity tableEntity = new AtlasEntity(HIVE_TYPE_TABLE);
+        tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
"test.table.qualified.name");
+
+        AtlasEntity ddlEntity = 
baseImpalaEvent.createHiveDDLEntity(tableEntity);
+
+        assertNotNull(ddlEntity);
+        assertEquals(ddlEntity.getTypeName(), 
ImpalaDataType.HIVE_TABLE_DDL.getName());
+        assertEquals(ddlEntity.getAttribute(ATTRIBUTE_SERVICE_TYPE), "impala");
+        assertEquals(ddlEntity.getAttribute(ATTRIBUTE_EXEC_TIME), TIMESTAMP * 
MILLIS_CONVERT_FACTOR);
+        assertEquals(ddlEntity.getAttribute(ATTRIBUTE_QUERY_TEXT), QUERY_TEXT);
+        assertEquals(ddlEntity.getAttribute(ATTRIBUTE_USER_NAME), USER_NAME);
+        assertNotNull(ddlEntity.getAttribute(ATTRIBUTE_TABLE));
+    }
+
+    @Test
+    public void testIsDdlOperation() {
+
+        
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.CREATEVIEW);
+        assertTrue(baseImpalaEvent.isDdlOperation());
+
+        
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.ALTERVIEW_AS);
+        assertTrue(baseImpalaEvent.isDdlOperation());
+
+        
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.CREATETABLE_AS_SELECT);
+        assertTrue(baseImpalaEvent.isDdlOperation());
+
+        
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.QUERY);
+        assertFalse(baseImpalaEvent.isDdlOperation());
+    }
+
+    @Test
+    public void testGetCreateTimeInVertex() {
+
+        LineageVertex vertex = createDatabaseVertex(DB_NAME);
+        vertex.setCreateTime(TIMESTAMP);
+
+        Long createTime = baseImpalaEvent.getCreateTimeInVertex(vertex);
+        assertEquals(createTime, Long.valueOf(TIMESTAMP));
+
+        Long createTimeNull = baseImpalaEvent.getCreateTimeInVertex(null);
+        assertTrue(createTimeNull > 0); // Should return current time in 
seconds
+
+        LineageVertex columnVertex = createColumnVertex(COLUMN_NAME, 
TABLE_NAME);
+        columnVertex.setCreateTime(null);
+        LineageVertexMetadata metadata = new LineageVertexMetadata();
+        metadata.setTableCreateTime(TIMESTAMP);
+        columnVertex.setMetadata(metadata);
+
+        Long metadataCreateTime = 
baseImpalaEvent.getCreateTimeInVertex(columnVertex);
+        assertEquals(metadataCreateTime, Long.valueOf(TIMESTAMP));
+    }
+
+    @Test(dataProvider = "qualifiedNameDataProvider")
+    public void testGetQualifiedNameForProcess(ImpalaOperationType 
operationType,
+                                               List<AtlasEntity> inputs,
+                                               List<AtlasEntity> outputs,
+                                               boolean shouldThrowException) 
throws Exception {
+        when(mockContext.getImpalaOperationType()).thenReturn(operationType);
+
+        if (shouldThrowException) {
+            try {
+                baseImpalaEvent.getQualifiedName(inputs, outputs);
+                fail("Expected IllegalArgumentException");
+            } catch (IllegalArgumentException e) {
+                assertTrue(e.getMessage().contains("unexpected operation 
type"));
+            }
+        } else {
+            String qualifiedName = baseImpalaEvent.getQualifiedName(inputs, 
outputs);
+            assertNotNull(qualifiedName);
+            if (operationType == ImpalaOperationType.CREATEVIEW ||
+                    operationType == ImpalaOperationType.CREATETABLE_AS_SELECT 
||
+                    operationType == ImpalaOperationType.ALTERVIEW_AS) {
+                assertTrue(qualifiedName.contains("@"));
+            } else {
+                assertTrue(qualifiedName.contains("->"));
+            }
+        }
+    }
+
+    @DataProvider(name = "qualifiedNameDataProvider")
+    public Object[][] qualifiedNameDataProvider() {
+        List<AtlasEntity> inputs = createMockEntities("input_table");
+        List<AtlasEntity> outputs = createMockEntities("output_table");
+
+        return new Object[][] {
+                {ImpalaOperationType.CREATEVIEW, inputs, outputs, false},
+                {ImpalaOperationType.CREATETABLE_AS_SELECT, inputs, outputs, 
false},
+                {ImpalaOperationType.ALTERVIEW_AS, inputs, outputs, false},
+                {ImpalaOperationType.QUERY, inputs, outputs, false},
+                {ImpalaOperationType.QUERY_WITH_CLAUSE, inputs, outputs, 
false},
+                {ImpalaOperationType.INSERT, inputs, outputs, true}  // Should 
throw exception
+        };
+    }
+
+    // Helper methods for creating test data
+
+    private LineageVertex createDatabaseVertex(String dbName) {
+        LineageVertex vertex = new LineageVertex();
+        vertex.setVertexType(ImpalaVertexType.DATABASE);
+        vertex.setVertexId(dbName);
+        vertex.setCreateTime(TIMESTAMP);
+        return vertex;
+    }
+
+    private LineageVertex createTableVertex(String tableName) {
+        LineageVertex vertex = new LineageVertex();
+        vertex.setVertexType(ImpalaVertexType.TABLE);
+        vertex.setVertexId(tableName);
+        vertex.setCreateTime(TIMESTAMP);
+        return vertex;
+    }
+
+    private LineageVertex createColumnVertex(String columnName, String 
tableName) {
+        LineageVertex vertex = new LineageVertex();
+        vertex.setVertexType(ImpalaVertexType.COLUMN);
+        vertex.setVertexId(DB_NAME+"."+tableName+"."+columnName);
+        vertex.setCreateTime(TIMESTAMP);
+
+        LineageVertexMetadata metadata = new LineageVertexMetadata();
+        metadata.setTableName(tableName);
+        metadata.setTableCreateTime(TIMESTAMP);
+        vertex.setMetadata(metadata);
+
+        return vertex;
+    }
+
+    private List<AtlasEntity> createMockEntities(String tableBaseName) {
+        List<AtlasEntity> entities = new ArrayList<>();
+
+        AtlasEntity entity = new AtlasEntity(HIVE_TYPE_TABLE);
+        entity.setGuid("test-guid-" + tableBaseName);
+        entity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableBaseName  +"@"+  
CLUSTER_NAME);
+        entity.setAttribute(ATTRIBUTE_NAME, tableBaseName);
+        entity.setAttribute(ATTRIBUTE_CREATE_TIME, TIMESTAMP * 
MILLIS_CONVERT_FACTOR);
+
+        entities.add(entity);
+        return entities;
+    }
+}
diff --git 
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/CreateImpalaProcessTest.java
 
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/CreateImpalaProcessTest.java
new file mode 100644
index 000000000..3d776fb7f
--- /dev/null
+++ 
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/CreateImpalaProcessTest.java
@@ -0,0 +1,457 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.atlas.impala.hook.events;
+
+import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
+import org.apache.atlas.impala.hook.ImpalaLineageHook;
+import org.apache.atlas.impala.model.ImpalaDependencyType;
+import org.apache.atlas.impala.model.ImpalaOperationType;
+import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.impala.model.ImpalaVertexType;
+import org.apache.atlas.impala.model.LineageEdge;
+import org.apache.atlas.impala.model.LineageVertex;
+import org.apache.atlas.impala.model.LineageVertexMetadata;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.notification.HookNotification;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_NAME;
+import static 
org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUALIFIED_NAME;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class CreateImpalaProcessTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CreateImpalaProcessTest.class);
+
+    @Mock
+    private ImpalaLineageHook mockHook;
+
+    @Mock
+    private AtlasImpalaHookContext mockContext;
+
+    @Mock
+    private ImpalaLineageHook impalaLineageHook;
+
+    @Mock
+    private ImpalaOperationType impalaOperationType;
+
+    @Mock
+    private ImpalaQuery impalaQuery;
+
+    @Mock
+    private AtlasEntity mockOutputEntity;
+
+    @Mock
+    private LineageVertexMetadata mockMetadata;
+
+    @Mock
+    private LineageEdge mockLineageEdge;
+    @Mock
+    private ImpalaQuery mockLineageQuery;
+
+    @Mock
+    private AtlasEntity mockInputEntity;
+
+
+    private CreateImpalaProcess createImpalaProcess;
+
+    private static final String TEST_CLUSTER_NAME = "test_cluster";
+    private static final String TEST_DB_NAME = "test_db";
+    private static final String CLUSTER_NAME = "testcluster";
+    private static final String DB_NAME = "testdb";
+    private static final String TABLE_NAME_SOURCE = "source_table";
+    private static final String TABLE_NAME_TARGET = "target_table";
+    private static final String COLUMN_NAME_ID = "id";
+    private static final String COLUMN_NAME_NAME = "name";
+    private static final String USER_NAME = "testuser";
+    private static final String HOST_NAME = "testhost";
+    private static final String QUERY_TEXT = "CREATE VIEW target_table AS 
SELECT id, name FROM source_db.source_table";
+    private static final String QUERY_ID = "test_query_id_123";
+    private static final long TIMESTAMP = 1554750072L;
+    private static final long END_TIME = 1554750554L;
+
+    private static final String TEST_TABLE_NAME = "test_table";
+    private static final String TEST_COLUMN_NAME = "test_column";
+    private static final String TEST_QUALIFIED_NAME = 
"test_db.test_table@test_cluster";
+    private static final String TEST_QUERY_TEXT = "SELECT * FROM test_table";
+    private static final String TEST_USER_NAME = "test_user";
+    private static final long TEST_TIMESTAMP = 1234567890L;
+    private static final long TEST_END_TIME = 1234567900L;
+    private static final long TEST_VERTEX_ID = 123L;
+
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+
+        when(mockHook.getMetadataNamespace()).thenReturn(CLUSTER_NAME);
+        when(mockHook.getHostName()).thenReturn(HOST_NAME);
+        when(mockHook.isConvertHdfsPathToLowerCase()).thenReturn(false);
+
+        when(mockContext.getUserName()).thenReturn(USER_NAME);
+        when(mockContext.getHostName()).thenReturn(HOST_NAME);
+        when(mockContext.getMetadataNamespace()).thenReturn(CLUSTER_NAME);
+        when(mockContext.getQueryStr()).thenReturn(QUERY_TEXT);
+        
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.CREATEVIEW);
+
+        createImpalaProcess = new CreateImpalaProcess(mockContext);
+    }
+
+
+    @Test
+    public void testGetNotificationMessagesWithNoEntities() throws Exception {
+
+        ImpalaQuery emptyQuery = new ImpalaQuery();
+        emptyQuery.setVertices(new ArrayList<>());
+        emptyQuery.setEdges(new ArrayList<>());
+        emptyQuery.setQueryText(QUERY_TEXT);
+        emptyQuery.setQueryId(QUERY_ID);
+        emptyQuery.setUser(USER_NAME);
+        emptyQuery.setTimestamp(TIMESTAMP);
+        emptyQuery.setEndTime(END_TIME);
+
+        when(mockContext.getLineageQuery()).thenReturn(emptyQuery);
+
+        CreateImpalaProcess process = new CreateImpalaProcess(mockContext);
+        List<HookNotification> notifications = 
process.getNotificationMessages();
+
+        assertNull(notifications);
+    }
+
+    @Test
+    public void testGetNotificationMessagesWithEntities() throws Exception {
+
+        String sourceTableFullName = "testdb.source_table";
+        String targetTableFullName = "testdb.target_table";
+        ImpalaQuery query = createTestQuery();
+        when(mockContext.getLineageQuery()).thenReturn(query);
+
+        AtlasEntity mockInputEntity = createMockTableEntity(TABLE_NAME_SOURCE);
+        AtlasEntity mockOutputEntity = 
createMockTableEntity(TABLE_NAME_TARGET);
+
+        
when(mockContext.getQualifiedNameForTable(sourceTableFullName)).thenReturn(TEST_DB_NAME+"."+TABLE_NAME_SOURCE+"@"+TEST_CLUSTER_NAME);
+        
when(mockContext.getDatabaseNameFromTable(sourceTableFullName)).thenReturn(TEST_DB_NAME);
+        
when(mockContext.getQualifiedNameForTable(targetTableFullName)).thenReturn(TEST_DB_NAME+"."+TABLE_NAME_TARGET+"@"+TEST_CLUSTER_NAME);
+        
when(mockContext.getDatabaseNameFromTable(targetTableFullName)).thenReturn(TEST_DB_NAME);
+        
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.QUERY);
+        when(mockContext.getEntity(getQualifiedTableName(TABLE_NAME_SOURCE)))
+                .thenReturn(mockInputEntity);
+        when(mockContext.getEntity(getQualifiedTableName(TABLE_NAME_TARGET)))
+                .thenReturn(mockOutputEntity);
+
+
+        CreateImpalaProcess process = new CreateImpalaProcess(mockContext);
+        AtlasEntitiesWithExtInfo entities = process.getEntities();
+        assertEquals(entities.getEntities().size(),2);
+        
assertEquals(entities.getEntities().get(0).getTypeName(),"impala_process");
+        
assertEquals(entities.getEntities().get(1).getTypeName(),"impala_process_execution");
+        
assertEquals(entities.getEntities().get(0).getAttribute(ATTRIBUTE_QUALIFIED_NAME),"QUERY:"+TEST_DB_NAME+"."+TABLE_NAME_SOURCE+"@"+TEST_CLUSTER_NAME+":"+"1554750072000->:"+TEST_DB_NAME+"."+TABLE_NAME_TARGET+"@"+TEST_CLUSTER_NAME+":1554750072000");
+    }
+
+
+    @Test
+    public void testGetEntitiesWithValidData() throws Exception {
+
+        String sourceTableFullName = "testdb.source_table";
+        String targetTableFullName = "testdb.target_table";
+        ImpalaQuery query = createTestQuery();
+        when(mockContext.getLineageQuery()).thenReturn(query);
+
+        AtlasEntity mockInputEntity = createMockTableEntity(TABLE_NAME_SOURCE);
+        AtlasEntity mockOutputEntity = 
createMockTableEntity(TABLE_NAME_TARGET);
+
+        
when(mockContext.getQualifiedNameForTable(sourceTableFullName)).thenReturn(TEST_DB_NAME+"."+TABLE_NAME_SOURCE+"@"+TEST_CLUSTER_NAME);
+        
when(mockContext.getDatabaseNameFromTable(sourceTableFullName)).thenReturn(TEST_DB_NAME);
+        
when(mockContext.getQualifiedNameForTable(targetTableFullName)).thenReturn(TEST_DB_NAME+"."+TABLE_NAME_TARGET+"@"+TEST_CLUSTER_NAME);
+        
when(mockContext.getDatabaseNameFromTable(targetTableFullName)).thenReturn(TEST_DB_NAME);
+        
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.QUERY);
+        when(mockContext.getEntity(getQualifiedTableName(TABLE_NAME_SOURCE)))
+                .thenReturn(mockInputEntity);
+        when(mockContext.getEntity(getQualifiedTableName(TABLE_NAME_TARGET)))
+                .thenReturn(mockOutputEntity);
+
+        CreateImpalaProcess process = new CreateImpalaProcess(mockContext);
+        List<HookNotification>  hookMsgList = 
process.getNotificationMessages();
+
+        
assertEquals(hookMsgList.get(0).getType().ordinal(),HookNotification.HookNotificationType.ENTITY_CREATE_V2.ordinal());
+    }
+
+    @Test
+    public void testGetEntitiesWithEmptyInputsAndOutputs() throws Exception {
+        ImpalaQuery query = new ImpalaQuery();
+        query.setVertices(new ArrayList<>());
+        query.setEdges(new ArrayList<>());
+        query.setQueryText(QUERY_TEXT);
+        query.setQueryId(QUERY_ID);
+        query.setUser(USER_NAME);
+        query.setTimestamp(TIMESTAMP);
+        query.setEndTime(END_TIME);
+
+        when(mockContext.getLineageQuery()).thenReturn(query);
+
+        CreateImpalaProcess process = new CreateImpalaProcess(mockContext);
+        AtlasEntitiesWithExtInfo entities = process.getEntities();
+
+        assertNull(entities);
+    }
+
+    @Test
+    public void testUserName() {
+        assertEquals(createImpalaProcess.getUserName(), USER_NAME);
+    }
+
+    private ImpalaQuery createTestQuery() {
+        ImpalaQuery query = new ImpalaQuery();
+        query.setQueryText(QUERY_TEXT);
+        query.setQueryId(QUERY_ID);
+        query.setUser(USER_NAME);
+        query.setTimestamp(TIMESTAMP);
+        query.setEndTime(END_TIME);
+
+        List<LineageVertex> vertices = new ArrayList<>();
+
+
+        LineageVertex sourceCol1 = createColumnVertex(0L, DB_NAME  +"."+  
TABLE_NAME_SOURCE  +"."+  COLUMN_NAME_ID,
+                TABLE_NAME_SOURCE, COLUMN_NAME_ID);
+        LineageVertex sourceCol2 = createColumnVertex(1L, DB_NAME  +"."+  
TABLE_NAME_SOURCE  +"." + COLUMN_NAME_NAME,
+                TABLE_NAME_SOURCE, COLUMN_NAME_NAME);
+
+
+        LineageVertex targetCol1 = createColumnVertex(2L, DB_NAME  +"."+  
TABLE_NAME_TARGET  +"."+  COLUMN_NAME_ID,
+                TABLE_NAME_TARGET, COLUMN_NAME_ID);
+        LineageVertex targetCol2 = createColumnVertex(3L, DB_NAME  +"."+  
TABLE_NAME_TARGET  +"."  +COLUMN_NAME_NAME,
+                TABLE_NAME_TARGET, COLUMN_NAME_NAME);
+
+        vertices.add(sourceCol1);
+        vertices.add(sourceCol2);
+        vertices.add(targetCol1);
+        vertices.add(targetCol2);
+        query.setVertices(vertices);
+
+        List<LineageEdge> edges = new ArrayList<>();
+
+        LineageEdge edge1 = new LineageEdge();
+        edge1.setSources(Arrays.asList(0L));
+        edge1.setTargets(Arrays.asList(2L));
+        edge1.setEdgeType(ImpalaDependencyType.PROJECTION);
+        edges.add(edge1);
+
+        LineageEdge edge2 = new LineageEdge();
+        edge2.setSources(Arrays.asList(1L));
+        edge2.setTargets(Arrays.asList(3L));
+        edge2.setEdgeType(ImpalaDependencyType.PROJECTION);
+        edges.add(edge2);
+
+        query.setEdges(edges);
+
+        return query;
+    }
+
+
+    private LineageVertex createColumnVertex(Long id, String vertexId, String 
tableName, String columnName) {
+        LineageVertex vertex = new LineageVertex();
+        vertex.setId(id);
+        vertex.setVertexId(vertexId);
+        vertex.setVertexType(ImpalaVertexType.COLUMN);
+
+        LineageVertexMetadata metadata = new LineageVertexMetadata();
+        metadata.setTableName(DB_NAME  +"."+  tableName);
+        metadata.setTableCreateTime(TIMESTAMP);
+        vertex.setMetadata(metadata);
+
+        return vertex;
+    }
+
+
+    private AtlasEntity createMockTableEntity(String tableName) {
+        AtlasEntity entity = new AtlasEntity(BaseImpalaEvent.HIVE_TYPE_TABLE);
+        entity.setAttribute(ATTRIBUTE_NAME, tableName);
+        entity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
getQualifiedTableName(tableName));
+        return entity;
+    }
+
+
+    private String getQualifiedTableName(String tableName) {
+        return (DB_NAME  +"."+  tableName  +"@"+  CLUSTER_NAME).toLowerCase();
+    }
+
+    @Test
+    public void testCreateImpalaProcessWithColumnLineage() throws Exception {
+
+        ImpalaQuery query = createTestQuery();
+        when(mockContext.getLineageQuery()).thenReturn(query);
+
+        AtlasEntity mockInputCol1 = new 
AtlasEntity(BaseImpalaEvent.HIVE_TYPE_COLUMN);
+        mockInputCol1.setAttribute(ATTRIBUTE_NAME, COLUMN_NAME_ID);
+        AtlasEntity mockInputCol2 = new 
AtlasEntity(BaseImpalaEvent.HIVE_TYPE_COLUMN);
+        mockInputCol2.setAttribute(ATTRIBUTE_NAME, COLUMN_NAME_NAME);
+
+        AtlasEntity mockOutputCol1 = new 
AtlasEntity(BaseImpalaEvent.HIVE_TYPE_COLUMN);
+        mockOutputCol1.setAttribute(ATTRIBUTE_NAME, COLUMN_NAME_ID);
+        AtlasEntity mockOutputCol2 = new 
AtlasEntity(BaseImpalaEvent.HIVE_TYPE_COLUMN);
+        mockOutputCol2.setAttribute(ATTRIBUTE_NAME, COLUMN_NAME_NAME);
+
+        when(mockContext.getEntity(DB_NAME  +"."+  TABLE_NAME_SOURCE  +"."+  
COLUMN_NAME_ID))
+                .thenReturn(mockInputCol1);
+        when(mockContext.getEntity(DB_NAME  +"."+  TABLE_NAME_SOURCE  +"."+  
COLUMN_NAME_NAME))
+                .thenReturn(mockInputCol2);
+        when(mockContext.getEntity(DB_NAME  +"."+  TABLE_NAME_TARGET  +"."+  
COLUMN_NAME_ID))
+                .thenReturn(mockOutputCol1);
+        when(mockContext.getEntity(DB_NAME  +"."+  TABLE_NAME_TARGET  +"."+  
COLUMN_NAME_NAME))
+                .thenReturn(mockOutputCol2);
+
+        CreateImpalaProcess process = new CreateImpalaProcess(mockContext);
+
+        AtlasEntitiesWithExtInfo entities = process.getEntities();
+
+    }
+
+    @Test
+    public void testProcessWithPredicateEdgeType() throws Exception {
+        ImpalaQuery query = createTestQuery();
+
+        for (LineageEdge edge : query.getEdges()) {
+            edge.setEdgeType(ImpalaDependencyType.PREDICATE);
+        }
+
+        when(mockContext.getLineageQuery()).thenReturn(query);
+
+        CreateImpalaProcess process = new CreateImpalaProcess(mockContext);
+
+        AtlasEntitiesWithExtInfo entities = process.getEntities();
+
+    }
+
+    @Test
+    public void testProcessWithMixedEdgeTypes() throws Exception {
+        ImpalaQuery query = createTestQuery();
+
+        LineageEdge predicateEdge = new LineageEdge();
+        predicateEdge.setSources(Arrays.asList(0L));
+        predicateEdge.setTargets(Arrays.asList(2L));
+        predicateEdge.setEdgeType(ImpalaDependencyType.PREDICATE);
+        query.getEdges().add(predicateEdge);
+
+        when(mockContext.getLineageQuery()).thenReturn(query);
+
+        CreateImpalaProcess process = new CreateImpalaProcess(mockContext);
+        AtlasEntitiesWithExtInfo entities = process.getEntities();
+
+        LOG.info("Mixed edge types test completed");
+    }
+
+    @Test
+    public void testProcessWithNullVertexMetadata() throws Exception {
+        ImpalaQuery query = createTestQuery();
+
+        query.getVertices().get(0).setMetadata(null);
+
+        when(mockContext.getLineageQuery()).thenReturn(query);
+
+        CreateImpalaProcess process = new CreateImpalaProcess(mockContext);
+
+        AtlasEntitiesWithExtInfo entities = process.getEntities();
+
+    }
+
+    private void setupBasicMocks() {
+        when(mockContext.getLineageQuery()).thenReturn(mockLineageQuery);
+        when(mockContext.getUserName()).thenReturn(TEST_USER_NAME);
+        
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.QUERY);
+        when(mockLineageQuery.getQueryText()).thenReturn(TEST_QUERY_TEXT);
+        when(mockLineageQuery.getTimestamp()).thenReturn(TEST_TIMESTAMP);
+        when(mockLineageQuery.getEndTime()).thenReturn(TEST_END_TIME);
+    }
+
+    private void setupOutputVertices() throws Exception {
+        java.lang.reflect.Field verticesMapField = 
BaseImpalaEvent.class.getDeclaredField("verticesMap");
+        verticesMapField.setAccessible(true);
+        Map<Long, LineageVertex> verticesMap = (Map<Long, LineageVertex>) 
verticesMapField.get(createImpalaProcess);
+
+        LineageVertex outputVertex1 = mock(LineageVertex.class);
+        LineageVertex inputVertex = mock(LineageVertex.class);
+
+        when(outputVertex1.getVertexId()).thenReturn("output_column1");
+        
when(outputVertex1.getVertexType()).thenReturn(ImpalaVertexType.COLUMN);
+        when(outputVertex1.getMetadata()).thenReturn(mockMetadata);
+
+        when(inputVertex.getVertexId()).thenReturn("input_column");
+        when(inputVertex.getVertexType()).thenReturn(ImpalaVertexType.COLUMN);
+        when(inputVertex.getMetadata()).thenReturn(mockMetadata);
+
+        when(mockMetadata.getTableName()).thenReturn(TEST_TABLE_NAME);
+
+        verticesMap.put(TEST_VERTEX_ID, outputVertex1);
+        verticesMap.put(TEST_VERTEX_ID + 2, inputVertex);
+    }
+
+    private AtlasEntity createMockProcessEntity() {
+        AtlasEntity process = mock(AtlasEntity.class);
+        
when(process.getAttribute("qualifiedName")).thenReturn(TEST_DB_NAME+"."+TEST_TABLE_NAME+"."+TEST_COLUMN_NAME+"@"+TEST_CLUSTER_NAME);
+        when(process.getAttribute("name")).thenReturn("test_process");
+        return process;
+    }
+
+    @Test
+    public void testProcessColumnLineageWithOutputColumn() throws Exception {
+
+        String outputColumnName = "output_column";
+        setupBasicMocks();
+
+        List<LineageEdge> edges = Arrays.asList(mockLineageEdge);
+        when(mockLineageQuery.getEdges()).thenReturn(edges);
+        
when(mockLineageEdge.getEdgeType()).thenReturn(ImpalaDependencyType.PROJECTION);
+        
when(mockLineageEdge.getTargets()).thenReturn(Arrays.asList(TEST_VERTEX_ID));
+        
when(mockContext.getQualifiedNameForColumn(any(LineageVertex.class))).thenReturn(TEST_DB_NAME+"."+TEST_TABLE_NAME+"."+TEST_COLUMN_NAME+"@"+TEST_CLUSTER_NAME);
+        
when(mockLineageEdge.getSources()).thenReturn(Arrays.asList(TEST_VERTEX_ID));
+
+        setupOutputVertices();
+
+        when(mockContext.getEntity(anyString())).thenReturn(mockOutputEntity, 
mockOutputEntity, mockInputEntity);
+        
when(mockOutputEntity.getAttribute("name")).thenReturn(outputColumnName);
+
+        AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo();
+        AtlasEntity process = createMockProcessEntity();
+
+        Method processColumnLineageMethod = 
CreateImpalaProcess.class.getDeclaredMethod("processColumnLineage", 
AtlasEntity.class, AtlasEntitiesWithExtInfo.class);
+        processColumnLineageMethod.setAccessible(true);
+
+        processColumnLineageMethod.invoke(createImpalaProcess, process, 
entities);
+
+        assertTrue(entities.getEntities().size() > 0);
+        
assertEquals(entities.getEntities().get(0).getTypeName(),"impala_column_lineage");
+        
assertEquals(entities.getEntities().get(0).getAttribute(ATTRIBUTE_QUALIFIED_NAME),TEST_DB_NAME+"."+TEST_TABLE_NAME+"."+TEST_COLUMN_NAME+"@"+TEST_CLUSTER_NAME+":"+outputColumnName);
+    }
+
+
+}
\ No newline at end of file

Reply via email to