James Taylor created PHOENIX-3999:
-------------------------------------
Summary: Optimize inner joins as SKIP-SCAN-JOIN when possible
Key: PHOENIX-3999
URL: https://issues.apache.org/jira/browse/PHOENIX-3999
Project: Phoenix
Issue Type: Bug
Reporter: James Taylor
Semi joins on the leading part of the primary key end up doing batches of point
queries (as opposed to a broadcast hash join), however inner joins do not.
Here's a set of example schemas that executes a skip scan on the inner query:
{code}
CREATE TABLE COMPLETED_BATCHES (
BATCH_SEQUENCE_NUM BIGINT NOT NULL,
BATCH_ID BIGINT NOT NULL,
CONSTRAINT PK PRIMARY KEY
(
BATCH_SEQUENCE_NUM,
BATCH_ID
)
);
CREATE TABLE ITEMS (
BATCH_ID BIGINT NOT NULL,
ITEM_ID BIGINT NOT NULL,
ITEM_TYPE BIGINT,
ITEM_VALUE VARCHAR,
CONSTRAINT PK PRIMARY KEY
(
BATCH_ID,
ITEM_ID
)
);
CREATE TABLE COMPLETED_ITEMS (
ITEM_TYPE BIGINT NOT NULL,
BATCH_SEQUENCE_NUM BIGINT NOT NULL,
ITEM_ID BIGINT NOT NULL,
ITEM_VALUE VARCHAR,
CONSTRAINT PK PRIMARY KEY
(
ITEM_TYPE,
BATCH_SEQUENCE_NUM,
ITEM_ID
)
);
{code}
The explain plan of these indicate that a dynamic filter will be performed like
this:
{code}
UPSERT SELECT
CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
SKIP-SCAN-JOIN TABLE 0
CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
SERVER FILTER BY FIRST KEY ONLY
SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
CLIENT MERGE SORT
DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
{code}
We should also be able to leverage this optimization when an inner join is used
such as this:
{code}
UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, ITEM_VALUE)
SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE
FROM ITEMS i, COMPLETED_BATCHES b
WHERE b.BATCH_ID = i.BATCH_ID AND
b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
{code}
A complete unit test looks like this:
{code}
@Test
public void testNestedLoopJoin() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String t1="COMPLETED_BATCHES";
String ddl1 = "CREATE TABLE " + t1 + " (\n" +
" BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" +
" BATCH_ID BIGINT NOT NULL,\n" +
" CONSTRAINT PK PRIMARY KEY\n" +
" (\n" +
" BATCH_SEQUENCE_NUM,\n" +
" BATCH_ID\n" +
" )\n" +
")" +
"";
conn.createStatement().execute(ddl1);
String t2="ITEMS";
String ddl2 = "CREATE TABLE " + t2 + " (\n" +
" BATCH_ID BIGINT NOT NULL,\n" +
" ITEM_ID BIGINT NOT NULL,\n" +
" ITEM_TYPE BIGINT,\n" +
" ITEM_VALUE VARCHAR,\n" +
" CONSTRAINT PK PRIMARY KEY\n" +
" (\n" +
" BATCH_ID,\n" +
" ITEM_ID\n" +
" )\n" +
")";
conn.createStatement().execute(ddl2);
String t3="COMPLETED_ITEMS";
String ddl3 = "CREATE TABLE " + t3 + "(\n" +
" ITEM_TYPE BIGINT NOT NULL,\n" +
" BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" +
" ITEM_ID BIGINT NOT NULL,\n" +
" ITEM_VALUE VARCHAR,\n" +
" CONSTRAINT PK PRIMARY KEY\n" +
" (\n" +
" ITEM_TYPE,\n" +
" BATCH_SEQUENCE_NUM, \n" +
" ITEM_ID\n" +
" )\n" +
")";
conn.createStatement().execute(ddl3);
conn.createStatement().execute("UPSERT INTO
"+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)");
conn.createStatement().execute("UPSERT INTO
"+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,4)");
conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id,
item_id, item_type, item_value) VALUES (1,100, 10, 'a')");
conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id,
item_id, item_type, item_value) VALUES (2,200, 20, 'a')");
conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id,
item_id, item_type, item_value) VALUES (3,300, 10, 'a')");
conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id,
item_id, item_type, item_value) VALUES (4,400, 20, 'a')");
conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id,
item_id, item_type, item_value) VALUES (5,500, 10, 'a')");
conn.commit();
conn.setAutoCommit(true);
String dml = "UPSERT INTO " + t3 + " (ITEM_TYPE,
BATCH_SEQUENCE_NUM, ITEM_ID, ITEM_VALUE)\n" +
"SELECT ITEM_TYPE, 1, ITEM_ID, ITEM_VALUE \n" +
"FROM " + t2 + " i\n" +
"WHERE EXISTS (" +
" SELECT 1 FROM " + t1 + " b WHERE b.BATCH_ID = i.BATCH_ID
AND " +
" b.BATCH_SEQUENCE_NUM > 0 AND b.BATCH_SEQUENCE_NUM < 2)";
conn.createStatement().execute(dml);
ResultSet rs = conn.createStatement().executeQuery("SELECT ITEM_ID
FROM " + t3);
assertTrue(rs.next());
assertEquals(rs.getLong(1), 200L);
assertTrue(rs.next());
assertEquals(rs.getLong(1), 400L);
assertFalse(rs.next());
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)