This is an automated email from the ASF dual-hosted git repository. pinal pushed a commit to branch atlas-2.5 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 547c31e48e951a625144309880bd22fee01ac7db Author: prashantdev88 <[email protected]> AuthorDate: Mon Sep 1 13:04:51 2025 +0530 ATLAS-5069: Improve Unit Test Coverage for Impala-bridge Module (#422) (cherry picked from commit 18d7f9dccf5658988d32e387339948286810f0a8) --- .../apache/atlas/impala/ImpalaLineageToolTest.java | 436 +++++++++++++++ .../impala/hook/AtlasImpalaHookContextTest.java | 262 +++++++++ .../impala/hook/ImpalaOperationParserTest.java | 92 ++++ .../impala/hook/events/BaseImpalaEventTest.java | 601 +++++++++++++++++++++ .../hook/events/CreateImpalaProcessTest.java | 457 ++++++++++++++++ 5 files changed, 1848 insertions(+) diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolTest.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolTest.java new file mode 100644 index 000000000..51ae1b002 --- /dev/null +++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolTest.java @@ -0,0 +1,436 @@ +/** Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +**/ +package org.apache.atlas.impala; + +import org.apache.atlas.impala.hook.ImpalaLineageHook; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + import static org.testng.Assert.*; + +public class ImpalaLineageToolTest { + + private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageToolTest.class); + + @Mock + private ImpalaLineageHook mockImpalaLineageHook; + + private Path tempDir; + private final PrintStream originalOut = System.out; + private ByteArrayOutputStream testOut; + + @BeforeMethod + public void setUp() throws IOException { + MockitoAnnotations.initMocks(this); + + tempDir = Files.createTempDirectory("impala-lineage-test"); + + testOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(testOut)); + } + + @AfterMethod + public void tearDown() throws IOException { + + if (tempDir != null && Files.exists(tempDir)) { + Files.walk(tempDir) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } + + System.setOut(originalOut); + } + + @Test + public void testConstructorWithValidArguments() { + String[] args = {"-d", "/test/directory", "-p", "test_prefix"}; + + ImpalaLineageTool tool = new ImpalaLineageTool(args); + + assertNotNull(tool); + } + + @Test(expectedExceptions = RuntimeException.class) + public void testConstructorWithInvalidArguments() { + String[] args = {"-invalid", "argument"}; + + new ImpalaLineageTool(args); + } + + @Test + public void testGetCurrentFilesWithNoFiles() throws IOException { + String[] args = {"-d", tempDir.toString(), "-p", "nonexistent"}; + ImpalaLineageTool tool = new ImpalaLineageTool(args); + + File[] files = tool.getCurrentFiles(); + + assertNotNull(files); + assertEquals(files.length, 0); + } + + @Test + public void testGetCurrentFilesWithSingleFile() throws IOException { + + File testFile = new File(tempDir.toFile(), "test_lineage.log"); + testFile.createNewFile(); + + String[] args = {"-d", tempDir.toString(), "-p", "test"}; + ImpalaLineageTool tool = new ImpalaLineageTool(args); + + File[] files = tool.getCurrentFiles(); + + assertNotNull(files); + assertEquals(files.length, 1); + assertEquals(files[0].getName(), "test_lineage.log"); + } + + @Test + public void testGetCurrentFilesWithMultipleFilesSorted() throws IOException { + + File file1 = new File(tempDir.toFile(), "test_file1.log"); + File file2 = new File(tempDir.toFile(), "test_file2.log"); + File file3 = new File(tempDir.toFile(), "test_file3.log"); + + file1.createNewFile(); + file2.createNewFile(); + file3.createNewFile(); + + + long baseTime = System.currentTimeMillis(); + file1.setLastModified(baseTime - 3000); // 3 seconds ago + file2.setLastModified(baseTime - 2000); // 2 seconds ago + file3.setLastModified(baseTime - 1000); // 1 second ago + + String[] args = {"-d", tempDir.toString(), "-p", "test"}; + ImpalaLineageTool tool = new ImpalaLineageTool(args); + + File[] files = tool.getCurrentFiles(); + + assertNotNull(files); + assertEquals(files.length, 3); + + assertTrue(files[0].lastModified() <= files[1].lastModified()); + assertTrue(files[1].lastModified() <= files[2].lastModified()); + } + + @Test + public void testGetCurrentFilesWithNonExistentDirectory() { + String[] args = {"-d", "/nonexistent/directory", "-p", "test"}; + ImpalaLineageTool tool = new ImpalaLineageTool(args); + + File[] files = tool.getCurrentFiles(); + + assertNotNull(files); + assertEquals(files.length, 0); + } + + @Test + public void testDeleteLineageAndWalSuccess() throws IOException { + File lineageFile = new File(tempDir.toFile(), "test_lineage.log"); + File walFile = new File(tempDir.toFile(), "test_wal.wal"); + + lineageFile.createNewFile(); + walFile.createNewFile(); + + assertTrue(lineageFile.exists()); + assertTrue(walFile.exists()); + + ImpalaLineageTool.deleteLineageAndWal(lineageFile, walFile.getAbsolutePath()); + + assertFalse(lineageFile.exists()); + assertFalse(walFile.exists()); + } + + @Test + public void testDeleteLineageAndWalNonExistentFiles() { + File nonExistentFile = new File(tempDir.toFile(), "nonexistent.log"); + String nonExistentWalPath = tempDir.resolve("nonexistent.wal").toString(); + + ImpalaLineageTool.deleteLineageAndWal(nonExistentFile, nonExistentWalPath); + } + + @Test + public void testImportHImpalaEntitiesWithNewWalFile() throws Exception { + + File lineageFile = new File(tempDir.toFile(), "test_lineage.log"); + String testContent = "test lineage content"; + + try (FileWriter writer = new FileWriter(lineageFile)) { + writer.write(testContent); + } + + String walPath = tempDir.resolve("test.wal").toString(); + + String[] args = {"-d", tempDir.toString(), "-p", "test"}; + ImpalaLineageTool tool = new ImpalaLineageTool(args); + + + doNothing().when(mockImpalaLineageHook).process(anyString()); + + tool.importHImpalaEntities(mockImpalaLineageHook, lineageFile.getAbsolutePath(), walPath); + + verify(mockImpalaLineageHook, times(1)).process(testContent); + + File walFile = new File(walPath); + assertTrue(walFile.exists()); + } + + @Test + public void testImportHImpalaEntitiesWithExistingWalFile() throws Exception { + + File lineageFile = new File(tempDir.toFile(), "test_lineage.log"); + String testContent = "test lineage content"; + + try (FileWriter writer = new FileWriter(lineageFile)) { + writer.write(testContent); + } + + + String walPath = tempDir.resolve("test.wal").toString(); + File walFile = new File(walPath); + try (FileWriter writer = new FileWriter(walFile)) { + writer.write("0, "+ lineageFile.getAbsolutePath()); + } + + String[] args = {"-d", tempDir.toString(), "-p", "test"}; + ImpalaLineageTool tool = new ImpalaLineageTool(args); + + doNothing().when(mockImpalaLineageHook).process(anyString()); + + tool.importHImpalaEntities(mockImpalaLineageHook, lineageFile.getAbsolutePath(), walPath); + + verify(mockImpalaLineageHook, times(1)).process(testContent); + } + + @Test + public void testImportHImpalaEntitiesWithProcessingFailure() throws Exception { + + File lineageFile = new File(tempDir.toFile(), "test_lineage.log"); + String testContent = "test lineage content"; + + try (FileWriter writer = new FileWriter(lineageFile)) { + writer.write(testContent); + } + + String walPath = tempDir.resolve("test.wal").toString(); + + String[] args = {"-d", tempDir.toString(), "-p", "test"}; + ImpalaLineageTool tool = new ImpalaLineageTool(args); + + doThrow(new RuntimeException("Processing failed")).when(mockImpalaLineageHook).process(anyString()); + + tool.importHImpalaEntities(mockImpalaLineageHook, lineageFile.getAbsolutePath(), walPath); + + verify(mockImpalaLineageHook, times(1)).process(testContent); + + File walFile = new File(walPath); + assertTrue(walFile.exists()); + String walContent = new String(Files.readAllBytes(walFile.toPath())); + assertEquals(walContent.trim(), "0, "+ lineageFile.getAbsolutePath()); + } + + + @Test + public void testMainWithIncorrectNumberOfArguments() { + String[] args = {"-d", tempDir.toString()}; + + ImpalaLineageTool.main(args); + + String output = testOut.toString(); + assertTrue(output.contains("wrong number of arguments")); + assertTrue(output.contains("Usage: import-impala.sh")); + } + + + @Test + public void testRunWithMultipleFiles() throws IOException { + // Create multiple test files + File file1 = new File(tempDir.toFile(), "test_file1.log"); + File file2 = new File(tempDir.toFile(), "test_file2.log"); + File file3 = new File(tempDir.toFile(), "test_file3.log"); + + file1.createNewFile(); + try (FileWriter writer = new FileWriter(file1)) { + writer.write("content1"); + } + + file2.createNewFile(); + try (FileWriter writer = new FileWriter(file2)) { + writer.write("content2"); + } + + file3.createNewFile(); + try (FileWriter writer = new FileWriter(file3)) { + writer.write("content3"); + } + + long baseTime = System.currentTimeMillis(); + file1.setLastModified(baseTime - 3000); + file2.setLastModified(baseTime - 2000); + file3.setLastModified(baseTime - 1000); + + String[] args = {"-d", tempDir.toString(), "-p", "test"}; + ImpalaLineageTool tool = new ImpalaLineageTool(args); + + try { + tool.run(); + } catch (Exception e) { + LOG.info("Error running test case ImpalaLineageToolTest.testRunWithMultipleFiles()"); + } + + assertFalse(file1.exists()); + assertFalse(file2.exists()); + assertTrue(file3.exists()); + } + + @Test + public void testRunWithNoFiles() { + String[] args = {"-d", tempDir.toString(), "-p", "nonexistent"}; + ImpalaLineageTool tool = new ImpalaLineageTool(args); + + tool.run(); + } + + @Test + public void testRunWithSingleFile() throws IOException { + File testFile = new File(tempDir.toFile(), "test_single.log"); + testFile.createNewFile(); + try (FileWriter writer = new FileWriter(testFile)) { + writer.write("single file content"); + } + + String[] args = {"-d", tempDir.toString(), "-p", "test"}; + ImpalaLineageTool tool = new ImpalaLineageTool(args); + + try { + tool.run(); + } catch (Exception e) { + LOG.info("Error running test case ImpalaLineageToolTest.testRunWithSingleFile()"); + } + + assertTrue(testFile.exists()); + } + + @Test + public void testProcessImpalaLineageHookSuccess() throws Exception { + String[] args = {"-d", tempDir.toString(), "-p", "test"}; + ImpalaLineageTool tool = new ImpalaLineageTool(args); + + java.lang.reflect.Method method = ImpalaLineageTool.class.getDeclaredMethod( + "processImpalaLineageHook", + ImpalaLineageHook.class, + java.util.List.class + ); + method.setAccessible(true); + + doNothing().when(mockImpalaLineageHook).process(anyString()); + + java.util.List<String> lineageList = java.util.Arrays.asList("query1", "query2", "query3"); + boolean result = (Boolean) method.invoke(tool, mockImpalaLineageHook, lineageList); + + assertTrue(result); + verify(mockImpalaLineageHook, times(3)).process(anyString()); + } + + @Test + public void testProcessImpalaLineageHookWithFailures() throws Exception { + String[] args = {"-d", tempDir.toString(), "-p", "test"}; + ImpalaLineageTool tool = new ImpalaLineageTool(args); + + // Use reflection to access the private method + java.lang.reflect.Method method = ImpalaLineageTool.class.getDeclaredMethod( + "processImpalaLineageHook", + ImpalaLineageHook.class, + java.util.List.class + ); + method.setAccessible(true); + + doNothing().when(mockImpalaLineageHook).process("query1"); + doThrow(new RuntimeException("Processing failed")).when(mockImpalaLineageHook).process("query2"); + doNothing().when(mockImpalaLineageHook).process("query3"); + + java.util.List<String> lineageList = java.util.Arrays.asList("query1", "query2", "query3"); + boolean result = (Boolean) method.invoke(tool, mockImpalaLineageHook, lineageList); + + assertFalse(result); + verify(mockImpalaLineageHook, times(3)).process(anyString()); + } + + @Test + public void testProcessImpalaLineageHookWithEmptyList() throws Exception { + String[] args = {"-d", tempDir.toString(), "-p", "test"}; + ImpalaLineageTool tool = new ImpalaLineageTool(args); + + java.lang.reflect.Method method = ImpalaLineageTool.class.getDeclaredMethod( + "processImpalaLineageHook", + ImpalaLineageHook.class, + java.util.List.class + ); + method.setAccessible(true); + + java.util.List<String> emptyList = java.util.Collections.emptyList(); + boolean result = (Boolean) method.invoke(tool, mockImpalaLineageHook, emptyList); + + assertTrue(result); + verify(mockImpalaLineageHook, never()).process(anyString()); + } + + @Test + public void testGetCurrentFilesWithMultipleFilesSortedByModificationTime() throws IOException, InterruptedException { + // Create multiple test files and ensure different modification times + File file1 = new File(tempDir.toFile(), "test_old.log"); + file1.createNewFile(); + file1.setLastModified(System.currentTimeMillis() - 3000); // 3 seconds ago + + File file2 = new File(tempDir.toFile(), "test_medium.log"); + file2.createNewFile(); + file2.setLastModified(System.currentTimeMillis() - 2000); // 2 seconds ago + + File file3 = new File(tempDir.toFile(), "test_new.log"); + file3.createNewFile(); + file3.setLastModified(System.currentTimeMillis() - 1000); // 1 second ago + + String[] args = {"-d", tempDir.toString(), "-p", "test"}; + ImpalaLineageTool tool = new ImpalaLineageTool(args); + + File[] files = tool.getCurrentFiles(); + + assertNotNull(files); + assertEquals(files.length, 3); + assertEquals(files[0].getName(), "test_old.log"); + assertEquals(files[1].getName(), "test_medium.log"); + assertEquals(files[2].getName(), "test_new.log"); + } + +} diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/AtlasImpalaHookContextTest.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/AtlasImpalaHookContextTest.java new file mode 100644 index 000000000..386ff0108 --- /dev/null +++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/AtlasImpalaHookContextTest.java @@ -0,0 +1,262 @@ +/** Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.atlas.impala.hook; + +import org.apache.atlas.impala.model.ImpalaOperationType; +import org.apache.atlas.impala.model.ImpalaQuery; +import org.apache.atlas.impala.model.LineageVertex; +import org.apache.atlas.impala.model.LineageVertexMetadata; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertThrows; + +public class AtlasImpalaHookContextTest { + + + @Mock + private ImpalaLineageHook impalaLineageHook; + + @Mock + private ImpalaOperationType impalaOperationType; + + @Mock + private ImpalaQuery impalaQuery; + + + @BeforeMethod + public void initializeMocks() { + MockitoAnnotations.initMocks(this); + } + + + @Test + public void testGetQualifiedNameForTableWithFullTableName() throws Exception { + + String database = "testDatabase"; + String table = "testTable"; + String metadataNamespace = "testNamespace"; + String expectedTableQualifiedName = (database + AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME + table + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; + String fullTableName = database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table; + + when(impalaLineageHook.getMetadataNamespace()).thenReturn(metadataNamespace); + + AtlasImpalaHookContext impalaHook = new AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery); + + String receivedTableQualifiedName = impalaHook.getQualifiedNameForTable(fullTableName); + assertEquals(expectedTableQualifiedName,receivedTableQualifiedName); + } + + @Test + public void testGetQualifiedNameForTableWithNullTableName() throws Exception { + + String fullTableName = null; + + AtlasImpalaHookContext impalaHook = new AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery); + assertThrows(IllegalArgumentException.class, () -> impalaHook.getQualifiedNameForTable(fullTableName)); + } + + @Test + public void testGetQualifiedNameForTableWithPartialTableName() throws Exception { + + String tableName = "testTableName"; + + AtlasImpalaHookContext impalaHook = new AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery); + assertThrows(IllegalArgumentException.class, () -> impalaHook.getQualifiedNameForTable(tableName)); + } + + @Test + public void testGetQualifiedNameForColumnUsingLineageVertexAndLineageVertexMetadata() throws Exception { + + String database = "testDatabase"; + String table = "testTable"; + String column = "testColumn"; + String metadataNamespace = "testNamespace"; + String fullTableName = database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table; + String fullColumnName = database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+column; + LineageVertex vertex = new LineageVertex(); + LineageVertexMetadata metadata = new LineageVertexMetadata(); + + String expectedColumnQualifiedName = (database + AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME + table + AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME + + column + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; + + metadata.setTableName(fullTableName); + vertex.setMetadata(metadata); + vertex.setVertexId(fullColumnName); + + when(impalaLineageHook.getMetadataNamespace()) + .thenReturn(metadataNamespace); + + AtlasImpalaHookContext impalaHook = new AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery); + + String receivedColumnQualifiedName = impalaHook.getQualifiedNameForColumn(vertex); + assertEquals(expectedColumnQualifiedName,receivedColumnQualifiedName); + } + + + + @Test + public void testGetQualifiedNameForColumnUsingLineageVertexAndLineageVertexMetadataAsNull() throws Exception { + + String database = "testDatabase"; + String table = "testTable"; + String column = "testColumn"; + String metadataNamespace = "testNamespace"; + String fullColumnName = database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+column; + LineageVertex vertex = new LineageVertex(); + + String expectedColumnQualifiedName = (database + AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME + table + AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME + + column + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; + + vertex.setMetadata(null); + vertex.setVertexId(fullColumnName); + + when(impalaLineageHook.getMetadataNamespace()) + .thenReturn(metadataNamespace); + + AtlasImpalaHookContext impalaHook = new AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery); + + String receivedColumnQualifiedName = impalaHook.getQualifiedNameForColumn(vertex); + assertEquals(expectedColumnQualifiedName,receivedColumnQualifiedName); + } + + @Test + public void testGetQualifiedNameForColumnUsingLineageVertexAndLineageVertexMetadataTableAsNull() throws Exception { + + String database = "testDatabase"; + String table = "testTable"; + String column = "testColumn"; + LineageVertex vertex = new LineageVertex(); + LineageVertexMetadata metadata = new LineageVertexMetadata(); + + vertex.setMetadata(metadata); + + AtlasImpalaHookContext impalaHook = new AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery); + + assertThrows(IllegalArgumentException.class, () -> impalaHook.getQualifiedNameForColumn(vertex)); + } + + @Test + public void testGetQualifiedNameForColumn() throws Exception { + + String database = "testDatabase"; + String table = "testTable"; + String column = "testColumn"; + String metadataNamespace = "testNamespace"; + String expectedColumnQualifiedName = (database + AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME + table + AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME + + column + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; + String fullColumnName = database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+column; + + when(impalaLineageHook.getMetadataNamespace()) + .thenReturn(metadataNamespace); + + + AtlasImpalaHookContext impalaHook = new AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery); + + String receivedColumnQualifiedName = impalaHook.getQualifiedNameForColumn(fullColumnName); + assertEquals(expectedColumnQualifiedName,receivedColumnQualifiedName); + } + + + @Test + public void testGetTableNameFromColumn() throws Exception { + + String table = "testTable"; + String column = "testColumn"; + String expectedTableName = table; + String fullTableName = table+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+column; + + + AtlasImpalaHookContext impalaHook = new AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery); + + String receivedTableName = impalaHook.getTableNameFromColumn(fullTableName); + assertEquals(expectedTableName,receivedTableName); + } + + @Test + public void testGetDatabaseNameFromTable() throws Exception { + + String table = "testTable"; + String database = "testDatabase"; + String expectedDatabaseName = database; + String fullTableName = database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table; + + + AtlasImpalaHookContext impalaHook = new AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery); + + String receivedDatabaseName = impalaHook.getDatabaseNameFromTable(fullTableName); + assertEquals(expectedDatabaseName,receivedDatabaseName); + } + + + @Test + public void testGetColumnNameOnlyWithFullCOlumnName() throws Exception { + + String table = "testTable"; + String column = "testColumn"; + String expectedColumnName = column; + String fullColumnName = table+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+column; + + AtlasImpalaHookContext impalaHook = new AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery); + + String receivedColumnName = impalaHook.getColumnNameOnly(fullColumnName); + assertEquals(expectedColumnName,receivedColumnName); + } + + + @Test + public void testGetColumnNameOnlyWithNullValue() throws Exception { + String fullColumnName = null; + AtlasImpalaHookContext impalaHook = new AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery); + + assertThrows(IllegalArgumentException.class, () -> impalaHook.getColumnNameOnly(fullColumnName)); + } + + @Test + public void testGetColumnNameOnlyWithPartialColumnName() throws Exception { + + String table = "testTable"; + String column = "testColumn"; + String expectedColumnName = column; + String columnName = column; + + AtlasImpalaHookContext impalaHook = new AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery); + + String receivedColumnName = impalaHook.getColumnNameOnly(columnName); + assertEquals(expectedColumnName,receivedColumnName); + } + + @Test + public void testGetQualifiedNameForDb() throws Exception { + + String database = "testDatabase"; + String metadataNamespace = "testNamespace"; + String expectedDatabaseName = (database + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; + AtlasImpalaHookContext impalaHook = new AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery); + + when(impalaLineageHook.getMetadataNamespace()).thenReturn(metadataNamespace); + + String receivedDatabaseName = impalaHook.getQualifiedNameForDb(database); + assertEquals(expectedDatabaseName,receivedDatabaseName); + } + + +} diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaOperationParserTest.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaOperationParserTest.java new file mode 100644 index 000000000..f1662f7c8 --- /dev/null +++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaOperationParserTest.java @@ -0,0 +1,92 @@ +/** Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.atlas.impala.hook; + +import org.apache.atlas.impala.model.ImpalaOperationType; +import org.testng.annotations.Test; + +import static org.junit.Assert.assertEquals; + +public class ImpalaOperationParserTest { + + @Test + public void testGetImpalaOperationTypeCREATEVIEW() { + String viewQuery = "create view test_view as select * from test_table"; + ImpalaOperationType operationType = ImpalaOperationParser.getImpalaOperationType(viewQuery); + assertEquals(ImpalaOperationType.CREATEVIEW,operationType); + } + + @Test + public void testGetImpalaOperationTypeCREATETABLE_AS_SELECT() { + String selectAsQuery = "create table test_table as select * from test_table_1"; + ImpalaOperationType operationType = ImpalaOperationParser.getImpalaOperationType(selectAsQuery); + assertEquals(ImpalaOperationType.CREATETABLE_AS_SELECT,operationType); + } + + @Test + public void testGetImpalaOperationTypeALTERVIEW_AS() { + String alterViewQuery = "ALTER VIEW test_view AS SELECT * FROM test_table_1;"; + ImpalaOperationType operationType = ImpalaOperationParser.getImpalaOperationType(alterViewQuery); + assertEquals(ImpalaOperationType.ALTERVIEW_AS,operationType); + } + + @Test + public void testGetImpalaOperationTypeQUERY() { + String query = "INSERT INTO test_table SELECT * FROM test_source_table;"; + ImpalaOperationType operationType = ImpalaOperationParser.getImpalaOperationType(query); + assertEquals(ImpalaOperationType.QUERY,operationType); + } + + + @Test + public void testGetImpalaOperationTypeQUERY_WITH_CLAUSE() { + String queryWithClause = "WITH test_table_2 AS (SELECT id FROM test_table)INSERT INTO test_table_1 SELECT id FROM test_table_2"; + ImpalaOperationType operationType = ImpalaOperationParser.getImpalaOperationType(queryWithClause); + assertEquals(ImpalaOperationType.QUERY_WITH_CLAUSE,operationType); + } + + @Test + public void testGetImpalaOperationTypeUNKNOWN() { + String unknowQuery = "SELECT * from test_table"; + ImpalaOperationType operationType = ImpalaOperationParser.getImpalaOperationType(unknowQuery); + assertEquals(ImpalaOperationType.UNKNOWN,operationType); + } + + @Test + public void testGetImpalaOperationSubTypeWithInvalidQuery() { + String unknowQuery = "SELECT * from test_table"; + ImpalaOperationType operationType = ImpalaOperationParser.getImpalaOperationSubType(ImpalaOperationType.QUERY_WITH_CLAUSE,unknowQuery); + assertEquals(ImpalaOperationType.UNKNOWN,operationType); + } + + @Test + public void testGetImpalaOperationSubTypeINSERT() { + String query = "INSERT INTO test_table SELECT * FROM test_source_table;"; + ImpalaOperationType operationType = ImpalaOperationParser.getImpalaOperationSubType(ImpalaOperationType.QUERY,query); + assertEquals(ImpalaOperationType.INSERT,operationType); + } + + @Test + public void testGetImpalaOperationSubTypeINSERT_OVERWRITE() { + String query = "INSERT OVERWRITE TABLE test_table\n" + + "SELECT region, SUM(amount) AS test_table_1\n" + + "FROM test_table_2\n" + + "GROUP BY region;\n"; + ImpalaOperationType operationType = ImpalaOperationParser.getImpalaOperationSubType(ImpalaOperationType.QUERY,query); + assertEquals(ImpalaOperationType.INSERT_OVERWRITE,operationType); + } +} diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/BaseImpalaEventTest.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/BaseImpalaEventTest.java new file mode 100644 index 000000000..0d5c4fc91 --- /dev/null +++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/BaseImpalaEventTest.java @@ -0,0 +1,601 @@ +/** Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.atlas.impala.hook.events; + +import org.apache.atlas.impala.hook.AtlasImpalaHookContext; +import org.apache.atlas.impala.model.ImpalaDataType; +import org.apache.atlas.impala.model.ImpalaNode; +import org.apache.atlas.impala.model.ImpalaOperationType; +import org.apache.atlas.impala.model.ImpalaQuery; +import org.apache.atlas.impala.model.ImpalaVertexType; +import org.apache.atlas.impala.model.LineageVertex; +import org.apache.atlas.impala.model.LineageVertexMetadata; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.notification.HookNotification; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; +import static org.testng.Assert.*; + +public class BaseImpalaEventTest { + private static final Logger LOG = LoggerFactory.getLogger(BaseImpalaEventTest.class); + + @Mock + private AtlasImpalaHookContext mockContext; + + @Mock + private ImpalaQuery mockQuery; + + private static final String CLUSTER_NAME = "test_cluster"; + private static final String DB_NAME = "test_database"; + private static final String TABLE_NAME = "test_table"; + private static final String COLUMN_NAME = "test_column"; + private static final String USER_NAME = "test_user"; + private static final String HOST_NAME = "test_host"; + private static final String QUERY_TEXT = "SELECT * FROM test_table"; + private static final String QUERY_ID = "test_query_123"; + private static final long TIMESTAMP = 1554750072L; + private static final long END_TIME = 1554750554L; + + private static class TestBaseImpalaEvent extends BaseImpalaEvent { + public TestBaseImpalaEvent(AtlasImpalaHookContext context) { + super(context); + } + + @Override + public List<HookNotification> getNotificationMessages() throws Exception { + return Collections.emptyList(); + } + } + + private TestBaseImpalaEvent baseImpalaEvent; + + @BeforeMethod + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + + when(mockContext.getUserName()).thenReturn(USER_NAME); + when(mockContext.getHostName()).thenReturn(HOST_NAME); + when(mockContext.getMetadataNamespace()).thenReturn(CLUSTER_NAME); + when(mockContext.getQueryStr()).thenReturn(QUERY_TEXT); + when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.QUERY); + when(mockContext.getLineageQuery()).thenReturn(mockQuery); + + when(mockQuery.getQueryId()).thenReturn(QUERY_ID); + when(mockQuery.getTimestamp()).thenReturn(TIMESTAMP); + when(mockQuery.getEndTime()).thenReturn(END_TIME); + + setupQualifiedNameMocks(); + + baseImpalaEvent = new TestBaseImpalaEvent(mockContext); + } + + private void setupQualifiedNameMocks() { + when(mockContext.getQualifiedNameForDb(anyString())).thenAnswer(invocation -> { + String dbName = invocation.getArgument(0); + return dbName.toLowerCase() +"@"+ CLUSTER_NAME; + }); + + when(mockContext.getQualifiedNameForTable(anyString())).thenAnswer(invocation -> { + String tableQualifiedName = invocation.getArgument(0); + return tableQualifiedName.toLowerCase() +"@"+ CLUSTER_NAME; + }); + + when(mockContext.getQualifiedNameForColumn(any(LineageVertex.class))).thenAnswer(invocation -> { + LineageVertex vertex = invocation.getArgument(0); + return vertex.getVertexId().toLowerCase() +"@"+ CLUSTER_NAME; + }); + + when(mockContext.getDatabaseNameFromTable(anyString())).thenAnswer(invocation -> { + String tableName = invocation.getArgument(0); + if (tableName.contains(".")) { + return tableName.split("\\.")[0]; + } + return DB_NAME; + }); + + when(mockContext.getColumnNameOnly(anyString())).thenAnswer(invocation -> { + String columnName = invocation.getArgument(0); + if (columnName.contains(".")) { + String[] parts = columnName.split("\\."); + return parts[parts.length - 1]; + } + return columnName; + }); + + when(mockContext.getTableNameFromColumn(anyString())).thenAnswer(invocation -> { + String columnName = invocation.getArgument(0); + if (columnName.contains(".") && columnName.split("\\.").length >= 2) { + String[] parts = columnName.split("\\."); + return parts[0] +"."+ parts[1]; + } + return DB_NAME +"."+ TABLE_NAME; + }); + } + + @Test + public void testGetUserName() { + assertEquals(baseImpalaEvent.getUserName(), USER_NAME); + } + + @Test + public void testGetQualifiedNameWithImpalaNode() throws Exception { + // Create a test ImpalaNode with database vertex + LineageVertex dbVertex = createDatabaseVertex(DB_NAME); + ImpalaNode dbNode = new ImpalaNode(dbVertex); + + String qualifiedName = baseImpalaEvent.getQualifiedName(dbNode); + + assertNotNull(qualifiedName); + assertEquals(qualifiedName, DB_NAME.toLowerCase() +"@"+ CLUSTER_NAME); + } + + @Test + public void testGetQualifiedNameWithLineageVertex() throws Exception { + // Test database vertex + LineageVertex dbVertex = createDatabaseVertex(DB_NAME); + String dbQualifiedName = baseImpalaEvent.getQualifiedName(dbVertex); + assertEquals(dbQualifiedName, DB_NAME.toLowerCase() +"@"+ CLUSTER_NAME); + + // Test table vertex + LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME); + String tableQualifiedName = baseImpalaEvent.getQualifiedName(tableVertex); + assertEquals(tableQualifiedName, DB_NAME+"."+TABLE_NAME.toLowerCase() +"@"+ CLUSTER_NAME); + + // Test column vertex + LineageVertex columnVertex = createColumnVertex(COLUMN_NAME, TABLE_NAME); + String columnQualifiedName = baseImpalaEvent.getQualifiedName(columnVertex); + System.out.println("columnQualifiedName : "+columnQualifiedName); + assertEquals(columnQualifiedName, DB_NAME+"."+TABLE_NAME+"."+COLUMN_NAME.toLowerCase() +"@"+ CLUSTER_NAME); + } + + @Test + public void testGetQualifiedNameWithNullVertex() { + try { + baseImpalaEvent.getQualifiedName((LineageVertex) null); + fail("Expected IllegalArgumentException for null vertex"); + } catch (IllegalArgumentException e) { + assertEquals(e.getMessage(), "node is null"); + } + } + + @Test + public void testGetQualifiedNameWithNullVertexType() { + LineageVertex vertex = new LineageVertex(); + vertex.setVertexId("test"); + vertex.setVertexType(null); + + String qualifiedName = baseImpalaEvent.getQualifiedName(vertex); + assertNull(qualifiedName); + } + + @Test + public void testGetQualifiedNameWithNullVertexId() { + LineageVertex vertex = new LineageVertex(); + vertex.setVertexType(ImpalaVertexType.DATABASE); + vertex.setVertexId(null); + + String qualifiedName = baseImpalaEvent.getQualifiedName(vertex); + assertNull(qualifiedName); + } + + @Test + public void testGetTableNameFromVertex() { + // Test with column vertex that has metadata + LineageVertex columnVertex = createColumnVertex(COLUMN_NAME, TABLE_NAME); + LineageVertexMetadata metadata = new LineageVertexMetadata(); + metadata.setTableName(TABLE_NAME); + columnVertex.setMetadata(metadata); + + String tableName = baseImpalaEvent.getTableNameFromVertex(columnVertex); + assertEquals(tableName, TABLE_NAME); + + // Test with non-column vertex + LineageVertex dbVertex = createDatabaseVertex(DB_NAME); + String tableNameFromDb = baseImpalaEvent.getTableNameFromVertex(dbVertex); + assertEquals(tableNameFromDb, DB_NAME +"."+ TABLE_NAME); + } + + @Test + public void testGetTableNameFromColumn() { + String columnName = DB_NAME +"."+ TABLE_NAME +"."+ COLUMN_NAME; + String tableName = baseImpalaEvent.getTableNameFromColumn(columnName); + assertEquals(tableName, DB_NAME +"."+ TABLE_NAME); + } + + @Test + public void testToDbEntityWithString() throws Exception { + + when(mockContext.getEntity(anyString())).thenReturn(null); + + AtlasEntity dbEntity = baseImpalaEvent.toDbEntity(DB_NAME); + + assertNotNull(dbEntity); + assertEquals(dbEntity.getTypeName(), HIVE_TYPE_DB); + assertEquals(dbEntity.getAttribute(ATTRIBUTE_NAME), DB_NAME.toLowerCase()); + assertEquals(dbEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), DB_NAME.toLowerCase() +"@"+ CLUSTER_NAME); + assertEquals(dbEntity.getAttribute(ATTRIBUTE_CLUSTER_NAME), CLUSTER_NAME); + assertNull(dbEntity.getGuid()); + } + + + @Test + public void testGetColumnEntities() throws Exception { + + LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME); + tableVertex.setId(1L); + tableVertex.setCreateTime(TIMESTAMP); + ImpalaNode tableNode = new ImpalaNode(tableVertex); + tableNode.addChild(tableVertex); + AtlasEntity entity = new AtlasEntity(HIVE_TYPE_COLUMN); + entity.setGuid("test-guid"); + entity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, "test.qualified.name"); + + AtlasObjectId objectId = BaseImpalaEvent.getObjectId(entity); + + when(mockContext.getEntity(anyString())).thenReturn(null); + when(mockContext.getUserName()).thenReturn(USER_NAME); + + List<AtlasEntity> entityList = baseImpalaEvent.getColumnEntities(objectId ,tableNode); + + assertNotNull(entityList.get(0)); + assertEquals(entityList.get(0).getTypeName(), HIVE_TYPE_COLUMN); + assertEquals(entityList.get(0).getAttribute(ATTRIBUTE_TABLE),objectId); + } + + @Test + public void testToTableEntity() throws Exception { + + LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME); + tableVertex.setId(1L); + tableVertex.setCreateTime(TIMESTAMP); + ImpalaNode tableNode = new ImpalaNode(tableVertex); + tableNode.addChild(tableVertex); + AtlasEntity entity = new AtlasEntity(HIVE_TYPE_TABLE); + entity.setGuid("test-guid"); + + AtlasObjectId objectId = BaseImpalaEvent.getObjectId(entity); + + when(mockContext.getEntity(anyString())).thenReturn(null); + when(mockContext.getUserName()).thenReturn(USER_NAME); + + AtlasEntity atlasEntity = baseImpalaEvent.toTableEntity(objectId ,tableNode,null); + + + assertEquals(atlasEntity.getTypeName(),HIVE_TYPE_TABLE); + assertEquals(atlasEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME),DB_NAME+"."+TABLE_NAME+"@"+CLUSTER_NAME); + } + + + + @Test + public void testToDbEntityWithCachedEntity() throws Exception { + // Setup entity cache to return existing entity + AtlasEntity cachedEntity = new AtlasEntity(HIVE_TYPE_DB); + cachedEntity.setAttribute(ATTRIBUTE_NAME, DB_NAME); + when(mockContext.getEntity(anyString())).thenReturn(cachedEntity); + + AtlasEntity dbEntity = baseImpalaEvent.toDbEntity(DB_NAME); + + assertNotNull(dbEntity); + assertEquals(dbEntity, cachedEntity); + } + + + @Test + public void testToTableEntityWithNullNode() { + try { + baseImpalaEvent.toTableEntity((ImpalaNode) null, (AtlasEntitiesWithExtInfo) null); + fail("Expected IllegalArgumentException for null table node"); + } catch (Exception e) { + assertTrue(e instanceof IllegalArgumentException); + assertTrue(e.getMessage().contains("table is null")); + } + } + + @Test + public void testGetObjectId() { + AtlasEntity entity = new AtlasEntity(HIVE_TYPE_TABLE); + entity.setGuid("test-guid"); + entity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, "test.qualified.name"); + + AtlasObjectId objectId = BaseImpalaEvent.getObjectId(entity); + + assertNotNull(objectId); + assertEquals(objectId.getGuid(), "test-guid"); + assertEquals(objectId.getTypeName(), HIVE_TYPE_TABLE); + assertEquals(objectId.getUniqueAttributes().get(ATTRIBUTE_QUALIFIED_NAME), "test.qualified.name"); + } + + @Test + public void testGetObjectIds() { + List<AtlasEntity> entities = new ArrayList<>(); + + AtlasEntity entity1 = new AtlasEntity(HIVE_TYPE_TABLE); + entity1.setGuid("test-guid-1"); + entity1.setAttribute(ATTRIBUTE_QUALIFIED_NAME, "test.table1"); + entities.add(entity1); + + AtlasEntity entity2 = new AtlasEntity(HIVE_TYPE_COLUMN); + entity2.setGuid("test-guid-2"); + entity2.setAttribute(ATTRIBUTE_QUALIFIED_NAME, "test.column1"); + entities.add(entity2); + + List<AtlasObjectId> objectIds = BaseImpalaEvent.getObjectIds(entities); + + assertNotNull(objectIds); + assertEquals(objectIds.size(), 2); + assertEquals(objectIds.get(0).getGuid(), "test-guid-1"); + assertEquals(objectIds.get(1).getGuid(), "test-guid-2"); + } + + @Test + public void testGetObjectIdsWithEmptyList() { + List<AtlasObjectId> objectIds = BaseImpalaEvent.getObjectIds(Collections.emptyList()); + assertNotNull(objectIds); + assertTrue(objectIds.isEmpty()); + } + + @Test + public void testGetTableCreateTimeWithNode() { + LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME); + tableVertex.setCreateTime(TIMESTAMP); + ImpalaNode tableNode = new ImpalaNode(tableVertex); + + long createTime = BaseImpalaEvent.getTableCreateTime(tableNode); + assertEquals(createTime, TIMESTAMP * MILLIS_CONVERT_FACTOR); + } + + @Test + public void testGetTableCreateTimeWithVertex() { + LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME); + tableVertex.setCreateTime(TIMESTAMP); + + long createTime = BaseImpalaEvent.getTableCreateTime(tableVertex); + assertEquals(createTime, TIMESTAMP * MILLIS_CONVERT_FACTOR); + } + + @Test + public void testGetTableCreateTimeWithNullTime() { + LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME); + tableVertex.setCreateTime(null); + + long createTime = BaseImpalaEvent.getTableCreateTime(tableVertex); + assertTrue(createTime > 0); + } + + @Test + public void testGetImpalaProcessEntity() throws Exception { + List<AtlasEntity> inputs = createMockEntities("input_table"); + List<AtlasEntity> outputs = createMockEntities("output_table"); + + AtlasEntity processEntity = baseImpalaEvent.getImpalaProcessEntity(inputs, outputs); + + assertNotNull(processEntity); + assertEquals(processEntity.getTypeName(), ImpalaDataType.IMPALA_PROCESS.getName()); + assertNotNull(processEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + assertNotNull(processEntity.getAttribute(ATTRIBUTE_INPUTS)); + assertNotNull(processEntity.getAttribute(ATTRIBUTE_OUTPUTS)); + assertEquals(processEntity.getAttribute(ATTRIBUTE_OPERATION_TYPE), ImpalaOperationType.QUERY); + assertEquals(processEntity.getAttribute(ATTRIBUTE_START_TIME), TIMESTAMP * MILLIS_CONVERT_FACTOR); + assertEquals(processEntity.getAttribute(ATTRIBUTE_END_TIME), END_TIME * MILLIS_CONVERT_FACTOR); + assertEquals(processEntity.getAttribute(ATTRIBUTE_USER_NAME), EMPTY_ATTRIBUTE_VALUE); + assertEquals(processEntity.getAttribute(ATTRIBUTE_QUERY_TEXT), EMPTY_ATTRIBUTE_VALUE); + } + + @Test + public void testGetImpalaProcessExecutionEntity() throws Exception { + List<AtlasEntity> inputs = createMockEntities("input_table"); + List<AtlasEntity> outputs = createMockEntities("output_table"); + AtlasEntity processEntity = baseImpalaEvent.getImpalaProcessEntity(inputs, outputs); + + AtlasEntity executionEntity = baseImpalaEvent.getImpalaProcessExecutionEntity(processEntity); + + assertNotNull(executionEntity); + assertEquals(executionEntity.getTypeName(), ImpalaDataType.IMPALA_PROCESS_EXECUTION.getName()); + assertNotNull(executionEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + assertEquals(executionEntity.getAttribute(ATTRIBUTE_START_TIME), TIMESTAMP * MILLIS_CONVERT_FACTOR); + assertEquals(executionEntity.getAttribute(ATTRIBUTE_END_TIME), END_TIME * MILLIS_CONVERT_FACTOR); + assertEquals(executionEntity.getAttribute(ATTRIBUTE_USER_NAME), USER_NAME); + assertEquals(executionEntity.getAttribute(ATTRIBUTE_QUERY_TEXT), QUERY_TEXT.toLowerCase().trim()); + assertEquals(executionEntity.getAttribute(ATTRIBUTE_QUERY_ID), QUERY_ID); + assertEquals(executionEntity.getAttribute(ATTRIBUTE_HOSTNAME), HOST_NAME); + } + + @Test + public void testCreateTableNode() { + Long createTime = TIMESTAMP; + ImpalaNode tableNode = baseImpalaEvent.createTableNode(TABLE_NAME, createTime); + + assertNotNull(tableNode); + assertEquals(tableNode.getOwnVertex().getVertexType(), ImpalaVertexType.TABLE); + assertEquals(tableNode.getOwnVertex().getVertexId(), TABLE_NAME); + assertEquals(tableNode.getOwnVertex().getCreateTime(), createTime); + } + + @Test + public void testCreateHiveDDLEntityWithDb() { + AtlasEntity dbEntity = new AtlasEntity(HIVE_TYPE_DB); + dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, "test.db.qualified.name"); + + AtlasEntity ddlEntity = baseImpalaEvent.createHiveDDLEntity(dbEntity); + + assertNotNull(ddlEntity); + assertEquals(ddlEntity.getTypeName(), ImpalaDataType.HIVE_DB_DDL.getName()); + assertEquals(ddlEntity.getAttribute(ATTRIBUTE_SERVICE_TYPE), "impala"); + assertEquals(ddlEntity.getAttribute(ATTRIBUTE_EXEC_TIME), TIMESTAMP * MILLIS_CONVERT_FACTOR); + assertEquals(ddlEntity.getAttribute(ATTRIBUTE_QUERY_TEXT), QUERY_TEXT); + assertEquals(ddlEntity.getAttribute(ATTRIBUTE_USER_NAME), USER_NAME); + assertNotNull(ddlEntity.getAttribute(ATTRIBUTE_DB)); + } + + @Test + public void testCreateHiveDDLEntityWithTable() { + AtlasEntity tableEntity = new AtlasEntity(HIVE_TYPE_TABLE); + tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, "test.table.qualified.name"); + + AtlasEntity ddlEntity = baseImpalaEvent.createHiveDDLEntity(tableEntity); + + assertNotNull(ddlEntity); + assertEquals(ddlEntity.getTypeName(), ImpalaDataType.HIVE_TABLE_DDL.getName()); + assertEquals(ddlEntity.getAttribute(ATTRIBUTE_SERVICE_TYPE), "impala"); + assertEquals(ddlEntity.getAttribute(ATTRIBUTE_EXEC_TIME), TIMESTAMP * MILLIS_CONVERT_FACTOR); + assertEquals(ddlEntity.getAttribute(ATTRIBUTE_QUERY_TEXT), QUERY_TEXT); + assertEquals(ddlEntity.getAttribute(ATTRIBUTE_USER_NAME), USER_NAME); + assertNotNull(ddlEntity.getAttribute(ATTRIBUTE_TABLE)); + } + + @Test + public void testIsDdlOperation() { + + when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.CREATEVIEW); + assertTrue(baseImpalaEvent.isDdlOperation()); + + when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.ALTERVIEW_AS); + assertTrue(baseImpalaEvent.isDdlOperation()); + + when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.CREATETABLE_AS_SELECT); + assertTrue(baseImpalaEvent.isDdlOperation()); + + when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.QUERY); + assertFalse(baseImpalaEvent.isDdlOperation()); + } + + @Test + public void testGetCreateTimeInVertex() { + + LineageVertex vertex = createDatabaseVertex(DB_NAME); + vertex.setCreateTime(TIMESTAMP); + + Long createTime = baseImpalaEvent.getCreateTimeInVertex(vertex); + assertEquals(createTime, Long.valueOf(TIMESTAMP)); + + Long createTimeNull = baseImpalaEvent.getCreateTimeInVertex(null); + assertTrue(createTimeNull > 0); // Should return current time in seconds + + LineageVertex columnVertex = createColumnVertex(COLUMN_NAME, TABLE_NAME); + columnVertex.setCreateTime(null); + LineageVertexMetadata metadata = new LineageVertexMetadata(); + metadata.setTableCreateTime(TIMESTAMP); + columnVertex.setMetadata(metadata); + + Long metadataCreateTime = baseImpalaEvent.getCreateTimeInVertex(columnVertex); + assertEquals(metadataCreateTime, Long.valueOf(TIMESTAMP)); + } + + @Test(dataProvider = "qualifiedNameDataProvider") + public void testGetQualifiedNameForProcess(ImpalaOperationType operationType, + List<AtlasEntity> inputs, + List<AtlasEntity> outputs, + boolean shouldThrowException) throws Exception { + when(mockContext.getImpalaOperationType()).thenReturn(operationType); + + if (shouldThrowException) { + try { + baseImpalaEvent.getQualifiedName(inputs, outputs); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("unexpected operation type")); + } + } else { + String qualifiedName = baseImpalaEvent.getQualifiedName(inputs, outputs); + assertNotNull(qualifiedName); + if (operationType == ImpalaOperationType.CREATEVIEW || + operationType == ImpalaOperationType.CREATETABLE_AS_SELECT || + operationType == ImpalaOperationType.ALTERVIEW_AS) { + assertTrue(qualifiedName.contains("@")); + } else { + assertTrue(qualifiedName.contains("->")); + } + } + } + + @DataProvider(name = "qualifiedNameDataProvider") + public Object[][] qualifiedNameDataProvider() { + List<AtlasEntity> inputs = createMockEntities("input_table"); + List<AtlasEntity> outputs = createMockEntities("output_table"); + + return new Object[][] { + {ImpalaOperationType.CREATEVIEW, inputs, outputs, false}, + {ImpalaOperationType.CREATETABLE_AS_SELECT, inputs, outputs, false}, + {ImpalaOperationType.ALTERVIEW_AS, inputs, outputs, false}, + {ImpalaOperationType.QUERY, inputs, outputs, false}, + {ImpalaOperationType.QUERY_WITH_CLAUSE, inputs, outputs, false}, + {ImpalaOperationType.INSERT, inputs, outputs, true} // Should throw exception + }; + } + + // Helper methods for creating test data + + private LineageVertex createDatabaseVertex(String dbName) { + LineageVertex vertex = new LineageVertex(); + vertex.setVertexType(ImpalaVertexType.DATABASE); + vertex.setVertexId(dbName); + vertex.setCreateTime(TIMESTAMP); + return vertex; + } + + private LineageVertex createTableVertex(String tableName) { + LineageVertex vertex = new LineageVertex(); + vertex.setVertexType(ImpalaVertexType.TABLE); + vertex.setVertexId(tableName); + vertex.setCreateTime(TIMESTAMP); + return vertex; + } + + private LineageVertex createColumnVertex(String columnName, String tableName) { + LineageVertex vertex = new LineageVertex(); + vertex.setVertexType(ImpalaVertexType.COLUMN); + vertex.setVertexId(DB_NAME+"."+tableName+"."+columnName); + vertex.setCreateTime(TIMESTAMP); + + LineageVertexMetadata metadata = new LineageVertexMetadata(); + metadata.setTableName(tableName); + metadata.setTableCreateTime(TIMESTAMP); + vertex.setMetadata(metadata); + + return vertex; + } + + private List<AtlasEntity> createMockEntities(String tableBaseName) { + List<AtlasEntity> entities = new ArrayList<>(); + + AtlasEntity entity = new AtlasEntity(HIVE_TYPE_TABLE); + entity.setGuid("test-guid-" + tableBaseName); + entity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableBaseName +"@"+ CLUSTER_NAME); + entity.setAttribute(ATTRIBUTE_NAME, tableBaseName); + entity.setAttribute(ATTRIBUTE_CREATE_TIME, TIMESTAMP * MILLIS_CONVERT_FACTOR); + + entities.add(entity); + return entities; + } +} diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/CreateImpalaProcessTest.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/CreateImpalaProcessTest.java new file mode 100644 index 000000000..3d776fb7f --- /dev/null +++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/CreateImpalaProcessTest.java @@ -0,0 +1,457 @@ +/** Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.atlas.impala.hook.events; + +import org.apache.atlas.impala.hook.AtlasImpalaHookContext; +import org.apache.atlas.impala.hook.ImpalaLineageHook; +import org.apache.atlas.impala.model.ImpalaDependencyType; +import org.apache.atlas.impala.model.ImpalaOperationType; +import org.apache.atlas.impala.model.ImpalaQuery; +import org.apache.atlas.impala.model.ImpalaVertexType; +import org.apache.atlas.impala.model.LineageEdge; +import org.apache.atlas.impala.model.LineageVertex; +import org.apache.atlas.impala.model.LineageVertexMetadata; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.notification.HookNotification; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_NAME; +import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUALIFIED_NAME; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.*; +import static org.testng.AssertJUnit.assertTrue; + +public class CreateImpalaProcessTest { + private static final Logger LOG = LoggerFactory.getLogger(CreateImpalaProcessTest.class); + + @Mock + private ImpalaLineageHook mockHook; + + @Mock + private AtlasImpalaHookContext mockContext; + + @Mock + private ImpalaLineageHook impalaLineageHook; + + @Mock + private ImpalaOperationType impalaOperationType; + + @Mock + private ImpalaQuery impalaQuery; + + @Mock + private AtlasEntity mockOutputEntity; + + @Mock + private LineageVertexMetadata mockMetadata; + + @Mock + private LineageEdge mockLineageEdge; + @Mock + private ImpalaQuery mockLineageQuery; + + @Mock + private AtlasEntity mockInputEntity; + + + private CreateImpalaProcess createImpalaProcess; + + private static final String TEST_CLUSTER_NAME = "test_cluster"; + private static final String TEST_DB_NAME = "test_db"; + private static final String CLUSTER_NAME = "testcluster"; + private static final String DB_NAME = "testdb"; + private static final String TABLE_NAME_SOURCE = "source_table"; + private static final String TABLE_NAME_TARGET = "target_table"; + private static final String COLUMN_NAME_ID = "id"; + private static final String COLUMN_NAME_NAME = "name"; + private static final String USER_NAME = "testuser"; + private static final String HOST_NAME = "testhost"; + private static final String QUERY_TEXT = "CREATE VIEW target_table AS SELECT id, name FROM source_db.source_table"; + private static final String QUERY_ID = "test_query_id_123"; + private static final long TIMESTAMP = 1554750072L; + private static final long END_TIME = 1554750554L; + + private static final String TEST_TABLE_NAME = "test_table"; + private static final String TEST_COLUMN_NAME = "test_column"; + private static final String TEST_QUALIFIED_NAME = "test_db.test_table@test_cluster"; + private static final String TEST_QUERY_TEXT = "SELECT * FROM test_table"; + private static final String TEST_USER_NAME = "test_user"; + private static final long TEST_TIMESTAMP = 1234567890L; + private static final long TEST_END_TIME = 1234567900L; + private static final long TEST_VERTEX_ID = 123L; + + + @BeforeMethod + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + + when(mockHook.getMetadataNamespace()).thenReturn(CLUSTER_NAME); + when(mockHook.getHostName()).thenReturn(HOST_NAME); + when(mockHook.isConvertHdfsPathToLowerCase()).thenReturn(false); + + when(mockContext.getUserName()).thenReturn(USER_NAME); + when(mockContext.getHostName()).thenReturn(HOST_NAME); + when(mockContext.getMetadataNamespace()).thenReturn(CLUSTER_NAME); + when(mockContext.getQueryStr()).thenReturn(QUERY_TEXT); + when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.CREATEVIEW); + + createImpalaProcess = new CreateImpalaProcess(mockContext); + } + + + @Test + public void testGetNotificationMessagesWithNoEntities() throws Exception { + + ImpalaQuery emptyQuery = new ImpalaQuery(); + emptyQuery.setVertices(new ArrayList<>()); + emptyQuery.setEdges(new ArrayList<>()); + emptyQuery.setQueryText(QUERY_TEXT); + emptyQuery.setQueryId(QUERY_ID); + emptyQuery.setUser(USER_NAME); + emptyQuery.setTimestamp(TIMESTAMP); + emptyQuery.setEndTime(END_TIME); + + when(mockContext.getLineageQuery()).thenReturn(emptyQuery); + + CreateImpalaProcess process = new CreateImpalaProcess(mockContext); + List<HookNotification> notifications = process.getNotificationMessages(); + + assertNull(notifications); + } + + @Test + public void testGetNotificationMessagesWithEntities() throws Exception { + + String sourceTableFullName = "testdb.source_table"; + String targetTableFullName = "testdb.target_table"; + ImpalaQuery query = createTestQuery(); + when(mockContext.getLineageQuery()).thenReturn(query); + + AtlasEntity mockInputEntity = createMockTableEntity(TABLE_NAME_SOURCE); + AtlasEntity mockOutputEntity = createMockTableEntity(TABLE_NAME_TARGET); + + when(mockContext.getQualifiedNameForTable(sourceTableFullName)).thenReturn(TEST_DB_NAME+"."+TABLE_NAME_SOURCE+"@"+TEST_CLUSTER_NAME); + when(mockContext.getDatabaseNameFromTable(sourceTableFullName)).thenReturn(TEST_DB_NAME); + when(mockContext.getQualifiedNameForTable(targetTableFullName)).thenReturn(TEST_DB_NAME+"."+TABLE_NAME_TARGET+"@"+TEST_CLUSTER_NAME); + when(mockContext.getDatabaseNameFromTable(targetTableFullName)).thenReturn(TEST_DB_NAME); + when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.QUERY); + when(mockContext.getEntity(getQualifiedTableName(TABLE_NAME_SOURCE))) + .thenReturn(mockInputEntity); + when(mockContext.getEntity(getQualifiedTableName(TABLE_NAME_TARGET))) + .thenReturn(mockOutputEntity); + + + CreateImpalaProcess process = new CreateImpalaProcess(mockContext); + AtlasEntitiesWithExtInfo entities = process.getEntities(); + assertEquals(entities.getEntities().size(),2); + assertEquals(entities.getEntities().get(0).getTypeName(),"impala_process"); + assertEquals(entities.getEntities().get(1).getTypeName(),"impala_process_execution"); + assertEquals(entities.getEntities().get(0).getAttribute(ATTRIBUTE_QUALIFIED_NAME),"QUERY:"+TEST_DB_NAME+"."+TABLE_NAME_SOURCE+"@"+TEST_CLUSTER_NAME+":"+"1554750072000->:"+TEST_DB_NAME+"."+TABLE_NAME_TARGET+"@"+TEST_CLUSTER_NAME+":1554750072000"); + } + + + @Test + public void testGetEntitiesWithValidData() throws Exception { + + String sourceTableFullName = "testdb.source_table"; + String targetTableFullName = "testdb.target_table"; + ImpalaQuery query = createTestQuery(); + when(mockContext.getLineageQuery()).thenReturn(query); + + AtlasEntity mockInputEntity = createMockTableEntity(TABLE_NAME_SOURCE); + AtlasEntity mockOutputEntity = createMockTableEntity(TABLE_NAME_TARGET); + + when(mockContext.getQualifiedNameForTable(sourceTableFullName)).thenReturn(TEST_DB_NAME+"."+TABLE_NAME_SOURCE+"@"+TEST_CLUSTER_NAME); + when(mockContext.getDatabaseNameFromTable(sourceTableFullName)).thenReturn(TEST_DB_NAME); + when(mockContext.getQualifiedNameForTable(targetTableFullName)).thenReturn(TEST_DB_NAME+"."+TABLE_NAME_TARGET+"@"+TEST_CLUSTER_NAME); + when(mockContext.getDatabaseNameFromTable(targetTableFullName)).thenReturn(TEST_DB_NAME); + when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.QUERY); + when(mockContext.getEntity(getQualifiedTableName(TABLE_NAME_SOURCE))) + .thenReturn(mockInputEntity); + when(mockContext.getEntity(getQualifiedTableName(TABLE_NAME_TARGET))) + .thenReturn(mockOutputEntity); + + CreateImpalaProcess process = new CreateImpalaProcess(mockContext); + List<HookNotification> hookMsgList = process.getNotificationMessages(); + + assertEquals(hookMsgList.get(0).getType().ordinal(),HookNotification.HookNotificationType.ENTITY_CREATE_V2.ordinal()); + } + + @Test + public void testGetEntitiesWithEmptyInputsAndOutputs() throws Exception { + ImpalaQuery query = new ImpalaQuery(); + query.setVertices(new ArrayList<>()); + query.setEdges(new ArrayList<>()); + query.setQueryText(QUERY_TEXT); + query.setQueryId(QUERY_ID); + query.setUser(USER_NAME); + query.setTimestamp(TIMESTAMP); + query.setEndTime(END_TIME); + + when(mockContext.getLineageQuery()).thenReturn(query); + + CreateImpalaProcess process = new CreateImpalaProcess(mockContext); + AtlasEntitiesWithExtInfo entities = process.getEntities(); + + assertNull(entities); + } + + @Test + public void testUserName() { + assertEquals(createImpalaProcess.getUserName(), USER_NAME); + } + + private ImpalaQuery createTestQuery() { + ImpalaQuery query = new ImpalaQuery(); + query.setQueryText(QUERY_TEXT); + query.setQueryId(QUERY_ID); + query.setUser(USER_NAME); + query.setTimestamp(TIMESTAMP); + query.setEndTime(END_TIME); + + List<LineageVertex> vertices = new ArrayList<>(); + + + LineageVertex sourceCol1 = createColumnVertex(0L, DB_NAME +"."+ TABLE_NAME_SOURCE +"."+ COLUMN_NAME_ID, + TABLE_NAME_SOURCE, COLUMN_NAME_ID); + LineageVertex sourceCol2 = createColumnVertex(1L, DB_NAME +"."+ TABLE_NAME_SOURCE +"." + COLUMN_NAME_NAME, + TABLE_NAME_SOURCE, COLUMN_NAME_NAME); + + + LineageVertex targetCol1 = createColumnVertex(2L, DB_NAME +"."+ TABLE_NAME_TARGET +"."+ COLUMN_NAME_ID, + TABLE_NAME_TARGET, COLUMN_NAME_ID); + LineageVertex targetCol2 = createColumnVertex(3L, DB_NAME +"."+ TABLE_NAME_TARGET +"." +COLUMN_NAME_NAME, + TABLE_NAME_TARGET, COLUMN_NAME_NAME); + + vertices.add(sourceCol1); + vertices.add(sourceCol2); + vertices.add(targetCol1); + vertices.add(targetCol2); + query.setVertices(vertices); + + List<LineageEdge> edges = new ArrayList<>(); + + LineageEdge edge1 = new LineageEdge(); + edge1.setSources(Arrays.asList(0L)); + edge1.setTargets(Arrays.asList(2L)); + edge1.setEdgeType(ImpalaDependencyType.PROJECTION); + edges.add(edge1); + + LineageEdge edge2 = new LineageEdge(); + edge2.setSources(Arrays.asList(1L)); + edge2.setTargets(Arrays.asList(3L)); + edge2.setEdgeType(ImpalaDependencyType.PROJECTION); + edges.add(edge2); + + query.setEdges(edges); + + return query; + } + + + private LineageVertex createColumnVertex(Long id, String vertexId, String tableName, String columnName) { + LineageVertex vertex = new LineageVertex(); + vertex.setId(id); + vertex.setVertexId(vertexId); + vertex.setVertexType(ImpalaVertexType.COLUMN); + + LineageVertexMetadata metadata = new LineageVertexMetadata(); + metadata.setTableName(DB_NAME +"."+ tableName); + metadata.setTableCreateTime(TIMESTAMP); + vertex.setMetadata(metadata); + + return vertex; + } + + + private AtlasEntity createMockTableEntity(String tableName) { + AtlasEntity entity = new AtlasEntity(BaseImpalaEvent.HIVE_TYPE_TABLE); + entity.setAttribute(ATTRIBUTE_NAME, tableName); + entity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedTableName(tableName)); + return entity; + } + + + private String getQualifiedTableName(String tableName) { + return (DB_NAME +"."+ tableName +"@"+ CLUSTER_NAME).toLowerCase(); + } + + @Test + public void testCreateImpalaProcessWithColumnLineage() throws Exception { + + ImpalaQuery query = createTestQuery(); + when(mockContext.getLineageQuery()).thenReturn(query); + + AtlasEntity mockInputCol1 = new AtlasEntity(BaseImpalaEvent.HIVE_TYPE_COLUMN); + mockInputCol1.setAttribute(ATTRIBUTE_NAME, COLUMN_NAME_ID); + AtlasEntity mockInputCol2 = new AtlasEntity(BaseImpalaEvent.HIVE_TYPE_COLUMN); + mockInputCol2.setAttribute(ATTRIBUTE_NAME, COLUMN_NAME_NAME); + + AtlasEntity mockOutputCol1 = new AtlasEntity(BaseImpalaEvent.HIVE_TYPE_COLUMN); + mockOutputCol1.setAttribute(ATTRIBUTE_NAME, COLUMN_NAME_ID); + AtlasEntity mockOutputCol2 = new AtlasEntity(BaseImpalaEvent.HIVE_TYPE_COLUMN); + mockOutputCol2.setAttribute(ATTRIBUTE_NAME, COLUMN_NAME_NAME); + + when(mockContext.getEntity(DB_NAME +"."+ TABLE_NAME_SOURCE +"."+ COLUMN_NAME_ID)) + .thenReturn(mockInputCol1); + when(mockContext.getEntity(DB_NAME +"."+ TABLE_NAME_SOURCE +"."+ COLUMN_NAME_NAME)) + .thenReturn(mockInputCol2); + when(mockContext.getEntity(DB_NAME +"."+ TABLE_NAME_TARGET +"."+ COLUMN_NAME_ID)) + .thenReturn(mockOutputCol1); + when(mockContext.getEntity(DB_NAME +"."+ TABLE_NAME_TARGET +"."+ COLUMN_NAME_NAME)) + .thenReturn(mockOutputCol2); + + CreateImpalaProcess process = new CreateImpalaProcess(mockContext); + + AtlasEntitiesWithExtInfo entities = process.getEntities(); + + } + + @Test + public void testProcessWithPredicateEdgeType() throws Exception { + ImpalaQuery query = createTestQuery(); + + for (LineageEdge edge : query.getEdges()) { + edge.setEdgeType(ImpalaDependencyType.PREDICATE); + } + + when(mockContext.getLineageQuery()).thenReturn(query); + + CreateImpalaProcess process = new CreateImpalaProcess(mockContext); + + AtlasEntitiesWithExtInfo entities = process.getEntities(); + + } + + @Test + public void testProcessWithMixedEdgeTypes() throws Exception { + ImpalaQuery query = createTestQuery(); + + LineageEdge predicateEdge = new LineageEdge(); + predicateEdge.setSources(Arrays.asList(0L)); + predicateEdge.setTargets(Arrays.asList(2L)); + predicateEdge.setEdgeType(ImpalaDependencyType.PREDICATE); + query.getEdges().add(predicateEdge); + + when(mockContext.getLineageQuery()).thenReturn(query); + + CreateImpalaProcess process = new CreateImpalaProcess(mockContext); + AtlasEntitiesWithExtInfo entities = process.getEntities(); + + LOG.info("Mixed edge types test completed"); + } + + @Test + public void testProcessWithNullVertexMetadata() throws Exception { + ImpalaQuery query = createTestQuery(); + + query.getVertices().get(0).setMetadata(null); + + when(mockContext.getLineageQuery()).thenReturn(query); + + CreateImpalaProcess process = new CreateImpalaProcess(mockContext); + + AtlasEntitiesWithExtInfo entities = process.getEntities(); + + } + + private void setupBasicMocks() { + when(mockContext.getLineageQuery()).thenReturn(mockLineageQuery); + when(mockContext.getUserName()).thenReturn(TEST_USER_NAME); + when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.QUERY); + when(mockLineageQuery.getQueryText()).thenReturn(TEST_QUERY_TEXT); + when(mockLineageQuery.getTimestamp()).thenReturn(TEST_TIMESTAMP); + when(mockLineageQuery.getEndTime()).thenReturn(TEST_END_TIME); + } + + private void setupOutputVertices() throws Exception { + java.lang.reflect.Field verticesMapField = BaseImpalaEvent.class.getDeclaredField("verticesMap"); + verticesMapField.setAccessible(true); + Map<Long, LineageVertex> verticesMap = (Map<Long, LineageVertex>) verticesMapField.get(createImpalaProcess); + + LineageVertex outputVertex1 = mock(LineageVertex.class); + LineageVertex inputVertex = mock(LineageVertex.class); + + when(outputVertex1.getVertexId()).thenReturn("output_column1"); + when(outputVertex1.getVertexType()).thenReturn(ImpalaVertexType.COLUMN); + when(outputVertex1.getMetadata()).thenReturn(mockMetadata); + + when(inputVertex.getVertexId()).thenReturn("input_column"); + when(inputVertex.getVertexType()).thenReturn(ImpalaVertexType.COLUMN); + when(inputVertex.getMetadata()).thenReturn(mockMetadata); + + when(mockMetadata.getTableName()).thenReturn(TEST_TABLE_NAME); + + verticesMap.put(TEST_VERTEX_ID, outputVertex1); + verticesMap.put(TEST_VERTEX_ID + 2, inputVertex); + } + + private AtlasEntity createMockProcessEntity() { + AtlasEntity process = mock(AtlasEntity.class); + when(process.getAttribute("qualifiedName")).thenReturn(TEST_DB_NAME+"."+TEST_TABLE_NAME+"."+TEST_COLUMN_NAME+"@"+TEST_CLUSTER_NAME); + when(process.getAttribute("name")).thenReturn("test_process"); + return process; + } + + @Test + public void testProcessColumnLineageWithOutputColumn() throws Exception { + + String outputColumnName = "output_column"; + setupBasicMocks(); + + List<LineageEdge> edges = Arrays.asList(mockLineageEdge); + when(mockLineageQuery.getEdges()).thenReturn(edges); + when(mockLineageEdge.getEdgeType()).thenReturn(ImpalaDependencyType.PROJECTION); + when(mockLineageEdge.getTargets()).thenReturn(Arrays.asList(TEST_VERTEX_ID)); + when(mockContext.getQualifiedNameForColumn(any(LineageVertex.class))).thenReturn(TEST_DB_NAME+"."+TEST_TABLE_NAME+"."+TEST_COLUMN_NAME+"@"+TEST_CLUSTER_NAME); + when(mockLineageEdge.getSources()).thenReturn(Arrays.asList(TEST_VERTEX_ID)); + + setupOutputVertices(); + + when(mockContext.getEntity(anyString())).thenReturn(mockOutputEntity, mockOutputEntity, mockInputEntity); + when(mockOutputEntity.getAttribute("name")).thenReturn(outputColumnName); + + AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(); + AtlasEntity process = createMockProcessEntity(); + + Method processColumnLineageMethod = CreateImpalaProcess.class.getDeclaredMethod("processColumnLineage", AtlasEntity.class, AtlasEntitiesWithExtInfo.class); + processColumnLineageMethod.setAccessible(true); + + processColumnLineageMethod.invoke(createImpalaProcess, process, entities); + + assertTrue(entities.getEntities().size() > 0); + assertEquals(entities.getEntities().get(0).getTypeName(),"impala_column_lineage"); + assertEquals(entities.getEntities().get(0).getAttribute(ATTRIBUTE_QUALIFIED_NAME),TEST_DB_NAME+"."+TEST_TABLE_NAME+"."+TEST_COLUMN_NAME+"@"+TEST_CLUSTER_NAME+":"+outputColumnName); + } + + +} \ No newline at end of file
