James Taylor created PHOENIX-3999: ------------------------------------- Summary: Optimize inner joins as SKIP-SCAN-JOIN when possible Key: PHOENIX-3999 URL: https://issues.apache.org/jira/browse/PHOENIX-3999 Project: Phoenix Issue Type: Bug Reporter: James Taylor
Semi joins on the leading part of the primary key end up doing batches of point queries (as opposed to a broadcast hash join), however inner joins do not. Here's a set of example schemas that executes a skip scan on the inner query: {code} CREATE TABLE COMPLETED_BATCHES ( BATCH_SEQUENCE_NUM BIGINT NOT NULL, BATCH_ID BIGINT NOT NULL, CONSTRAINT PK PRIMARY KEY ( BATCH_SEQUENCE_NUM, BATCH_ID ) ); CREATE TABLE ITEMS ( BATCH_ID BIGINT NOT NULL, ITEM_ID BIGINT NOT NULL, ITEM_TYPE BIGINT, ITEM_VALUE VARCHAR, CONSTRAINT PK PRIMARY KEY ( BATCH_ID, ITEM_ID ) ); CREATE TABLE COMPLETED_ITEMS ( ITEM_TYPE BIGINT NOT NULL, BATCH_SEQUENCE_NUM BIGINT NOT NULL, ITEM_ID BIGINT NOT NULL, ITEM_VALUE VARCHAR, CONSTRAINT PK PRIMARY KEY ( ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID ) ); {code} The explain plan of these indicate that a dynamic filter will be performed like this: {code} UPSERT SELECT CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS SKIP-SCAN-JOIN TABLE 0 CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] SERVER FILTER BY FIRST KEY ONLY SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] CLIENT MERGE SORT DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) {code} We should also be able to leverage this optimization when an inner join is used such as this: {code} UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, ITEM_VALUE) SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE FROM ITEMS i, COMPLETED_BATCHES b WHERE b.BATCH_ID = i.BATCH_ID AND b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; {code} A complete unit test looks like this: {code} @Test public void testNestedLoopJoin() throws Exception { try (Connection conn = DriverManager.getConnection(getUrl())) { String t1="COMPLETED_BATCHES"; String ddl1 = "CREATE TABLE " + t1 + " (\n" + " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + " BATCH_ID BIGINT NOT NULL,\n" + " CONSTRAINT PK PRIMARY KEY\n" + " (\n" + " BATCH_SEQUENCE_NUM,\n" + " BATCH_ID\n" + " )\n" + ")" + ""; conn.createStatement().execute(ddl1); String t2="ITEMS"; String ddl2 = "CREATE TABLE " + t2 + " (\n" + " BATCH_ID BIGINT NOT NULL,\n" + " ITEM_ID BIGINT NOT NULL,\n" + " ITEM_TYPE BIGINT,\n" + " ITEM_VALUE VARCHAR,\n" + " CONSTRAINT PK PRIMARY KEY\n" + " (\n" + " BATCH_ID,\n" + " ITEM_ID\n" + " )\n" + ")"; conn.createStatement().execute(ddl2); String t3="COMPLETED_ITEMS"; String ddl3 = "CREATE TABLE " + t3 + "(\n" + " ITEM_TYPE BIGINT NOT NULL,\n" + " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + " ITEM_ID BIGINT NOT NULL,\n" + " ITEM_VALUE VARCHAR,\n" + " CONSTRAINT PK PRIMARY KEY\n" + " (\n" + " ITEM_TYPE,\n" + " BATCH_SEQUENCE_NUM, \n" + " ITEM_ID\n" + " )\n" + ")"; conn.createStatement().execute(ddl3); conn.createStatement().execute("UPSERT INTO "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)"); conn.createStatement().execute("UPSERT INTO "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,4)"); conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (1,100, 10, 'a')"); conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (2,200, 20, 'a')"); conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (3,300, 10, 'a')"); conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (4,400, 20, 'a')"); conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (5,500, 10, 'a')"); conn.commit(); conn.setAutoCommit(true); String dml = "UPSERT INTO " + t3 + " (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, ITEM_VALUE)\n" + "SELECT ITEM_TYPE, 1, ITEM_ID, ITEM_VALUE \n" + "FROM " + t2 + " i\n" + "WHERE EXISTS (" + " SELECT 1 FROM " + t1 + " b WHERE b.BATCH_ID = i.BATCH_ID AND " + " b.BATCH_SEQUENCE_NUM > 0 AND b.BATCH_SEQUENCE_NUM < 2)"; conn.createStatement().execute(dml); ResultSet rs = conn.createStatement().executeQuery("SELECT ITEM_ID FROM " + t3); assertTrue(rs.next()); assertEquals(rs.getLong(1), 200L); assertTrue(rs.next()); assertEquals(rs.getLong(1), 400L); assertFalse(rs.next()); } } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)