This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7ab5a13637 Add integration test for lookup join in the multi-stage
engine (#15244)
7ab5a13637 is described below
commit 7ab5a1363769259fd12d9ca8c311151121f0a679
Author: Krishan Goyal <[email protected]>
AuthorDate: Tue Mar 18 22:31:33 2025 +0530
Add integration test for lookup join in the multi-stage engine (#15244)
---
.../tests/BaseClusterIntegrationTest.java | 24 +++++++++++
.../tests/ClusterIntegrationTestUtils.java | 10 ++++-
.../tests/MultiStageEngineIntegrationTest.java | 46 ++++++++++++++++++++++
.../src/test/resources/dimDayOfWeek_config.json | 18 +++++++++
.../src/test/resources/dimDayOfWeek_data.csv | 8 ++++
.../src/test/resources/dimDayOfWeek_schema.json | 16 ++++++++
6 files changed, 121 insertions(+), 1 deletion(-)
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 7b59e397d9..dac9873c62 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.time.Duration;
@@ -58,6 +59,7 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -279,6 +281,13 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
return Schema.fromInputStream(new FileInputStream(schemaFile));
}
+ protected TableConfig createTableConfig(String tableConfigFileName)
+ throws IOException {
+ URL configPathUrl =
getClass().getClassLoader().getResource(tableConfigFileName);
+ Assert.assertNotNull(configPathUrl);
+ return createTableConfig(new File(configPathUrl.getFile()));
+ }
+
protected TableConfig createTableConfig(File tableConfigFile)
throws IOException {
InputStream inputStream = new FileInputStream(tableConfigFile);
@@ -600,6 +609,21 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
return false;
}
+ protected void createAndUploadSegmentFromFile(TableConfig tableConfig,
Schema schema, String dataFilePath,
+ FileFormat fileFormat, long expectedNoOfDocs, long timeoutMs) throws
Exception {
+ URL dataPathUrl = getClass().getClassLoader().getResource(dataFilePath);
+ assert dataPathUrl != null;
+ File file = new File(dataPathUrl.getFile());
+
+ TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
+ ClusterIntegrationTestUtils.buildSegmentFromFile(file, tableConfig,
schema, "%", _segmentDir, _tarDir, fileFormat);
+ uploadSegments(tableConfig.getTableName(), _tarDir);
+
+ TestUtils.waitForCondition(() ->
getCurrentCountStarResult(tableConfig.getTableName()) == expectedNoOfDocs, 100L,
+ timeoutMs, "Failed to load " + expectedNoOfDocs + " documents in table
" + tableConfig.getTableName(),
+ true, Duration.ofMillis(timeoutMs / 10));
+ }
+
protected List<File> getAllAvroFiles()
throws Exception {
// Unpack the Avro files
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 2a73c93994..e53c2ce300 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -75,6 +75,7 @@ import
org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -341,8 +342,15 @@ public class ClusterIntegrationTestUtils {
public static void buildSegmentFromAvro(File avroFile, TableConfig
tableConfig,
org.apache.pinot.spi.data.Schema schema, String segmentNamePostfix, File
segmentDir, File tarDir)
throws Exception {
+ buildSegmentFromFile(avroFile, tableConfig, schema, segmentNamePostfix,
segmentDir, tarDir, FileFormat.AVRO);
+ }
+
+ public static void buildSegmentFromFile(File file, TableConfig tableConfig,
org.apache.pinot.spi.data.Schema schema,
+ String segmentNamePostfix, File segmentDir, File tarDir, FileFormat
fileFormat)
+ throws Exception {
SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(tableConfig, schema);
- segmentGeneratorConfig.setInputFilePath(avroFile.getPath());
+ segmentGeneratorConfig.setFormat(fileFormat);
+ segmentGeneratorConfig.setInputFilePath(file.getPath());
segmentGeneratorConfig.setOutDir(segmentDir.getPath());
segmentGeneratorConfig.setTableName(tableConfig.getTableName());
segmentGeneratorConfig.setSegmentNamePostfix(segmentNamePostfix);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index c295efe49a..7b75a681d0 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -29,9 +29,11 @@ import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -44,11 +46,13 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
@@ -75,6 +79,11 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
private static final String TABLE_NAME_WITH_DATABASE = DATABASE_NAME + "." +
DEFAULT_TABLE_NAME;
private String _tableName = DEFAULT_TABLE_NAME;
+ private static final String DIM_TABLE_DATA_PATH = "dimDayOfWeek_data.csv";
+ private static final String DIM_TABLE_SCHEMA_PATH =
"dimDayOfWeek_schema.json";
+ private static final String DIM_TABLE_TABLE_CONFIG_PATH =
"dimDayOfWeek_config.json";
+ private static final Integer DIM_NUMBER_OF_RECORDS = 7;
+
@Override
protected String getSchemaFileName() {
return SCHEMA_FILE_NAME;
@@ -1618,6 +1627,43 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
}
@Test
+ public void testLookupJoin() throws Exception {
+
+ Schema lookupTableSchema = createSchema(DIM_TABLE_SCHEMA_PATH);
+ addSchema(lookupTableSchema);
+ TableConfig tableConfig = createTableConfig(DIM_TABLE_TABLE_CONFIG_PATH);
+ TenantConfig tenantConfig = new TenantConfig(getBrokerTenant(),
getServerTenant(), null);
+ tableConfig.setTenantConfig(tenantConfig);
+ addTableConfig(tableConfig);
+ createAndUploadSegmentFromFile(tableConfig, lookupTableSchema,
DIM_TABLE_DATA_PATH, FileFormat.CSV,
+ DIM_NUMBER_OF_RECORDS, 60_000);
+
+ // Compare total rows in the primary table with number of rows in the
result of the join with lookup table
+ String query = "select count(*) from " + getTableName();
+ JsonNode jsonNode = postQuery(query);
+ long totalRowsInTable =
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+
+ query = "select /*+ joinOptions(join_strategy='lookup') */ AirlineID,
DayOfWeek, dayName from " + getTableName()
+ + " join daysOfWeek ON DayOfWeek = dayId where dayName in ('Monday',
'Tuesday', 'Wednesday')";
+ jsonNode = postQuery(query);
+ long result = jsonNode.get("resultTable").get("rows").size();
+ assertTrue(result > 0);
+ assertTrue(result < totalRowsInTable);
+
+ // Verify that LOOKUP_JOIN stage is present and HASH_JOIN stage is not
present in the query plan
+ Set<String> stages = new HashSet<>();
+ JsonNode currentNode = jsonNode.get("stageStats").get("children");
+ while (currentNode != null) {
+ currentNode = currentNode.get(0);
+ stages.add(currentNode.get("type").asText());
+ currentNode = currentNode.get("children");
+ }
+ assertTrue(stages.contains("LOOKUP_JOIN"), "Could not find LOOKUP_JOIN
stage in the query plan");
+ assertFalse(stages.contains("HASH_JOIN"), "HASH_JOIN stage should not be
present in the query plan");
+
+ dropOfflineTable(tableConfig.getTableName());
+ }
+
public void testSearchLiteralFilter() throws Exception {
String sqlQuery =
"WITH CTE_B AS (SELECT 1692057600000 AS __ts FROM mytable GROUP BY
__ts) SELECT 1692057600000 AS __ts FROM "
diff --git
a/pinot-integration-tests/src/test/resources/dimDayOfWeek_config.json
b/pinot-integration-tests/src/test/resources/dimDayOfWeek_config.json
new file mode 100644
index 0000000000..20f4f0d303
--- /dev/null
+++ b/pinot-integration-tests/src/test/resources/dimDayOfWeek_config.json
@@ -0,0 +1,18 @@
+{
+ "tableName": "daysOfWeek",
+ "tableType": "OFFLINE",
+ "isDimTable": true,
+ "segmentsConfig": {
+ "segmentPushType": "REFRESH",
+ "replication": "1"
+ },
+ "tenants": {
+ },
+ "tableIndexConfig": {
+ "loadMode": "MMAP"
+ },
+ "metadata": {
+ "customConfigs": {
+ }
+ }
+}
diff --git a/pinot-integration-tests/src/test/resources/dimDayOfWeek_data.csv
b/pinot-integration-tests/src/test/resources/dimDayOfWeek_data.csv
new file mode 100644
index 0000000000..a3ed71ffd4
--- /dev/null
+++ b/pinot-integration-tests/src/test/resources/dimDayOfWeek_data.csv
@@ -0,0 +1,8 @@
+dayId,dayName
+1,"Sunday"
+2,"Monday"
+3,"Tuesday"
+4,"Wednesday"
+5,"Thursday"
+6,"Friday"
+7,"Saturday"
\ No newline at end of file
diff --git
a/pinot-integration-tests/src/test/resources/dimDayOfWeek_schema.json
b/pinot-integration-tests/src/test/resources/dimDayOfWeek_schema.json
new file mode 100644
index 0000000000..967faa81f7
--- /dev/null
+++ b/pinot-integration-tests/src/test/resources/dimDayOfWeek_schema.json
@@ -0,0 +1,16 @@
+{
+ "dimensionFieldSpecs": [
+ {
+ "dataType": "INT",
+ "name": "dayId"
+ },
+ {
+ "dataType": "STRING",
+ "name": "dayName"
+ }
+ ],
+ "schemaName": "daysOfWeek",
+ "primaryKeyColumns": [
+ "dayId"
+ ]
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]