[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16086686#comment-16086686
 ] 

James Taylor commented on PHOENIX-3999:
---------------------------------------

Thanks for the response, [~maryannxue]! We have both rows, the RHS and LHS. Why 
can't we include a reference to additional columns and just combine the two 
rows together (much like we do for our HashJoin)? We're driving from a scan of 
the RHS, right? So don't we have what ever rows we need there? And then on the 
LHS, if need be, we could project the rows we need in the scan doing the skip 
scan filter.

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> ----------------------------------------------------
>
>                 Key: PHOENIX-3999
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-3999
>             Project: Phoenix
>          Issue Type: Bug
>            Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
>     BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>     BATCH_ID           BIGINT NOT NULL,
>     CONSTRAINT PK PRIMARY KEY
>     (
>         BATCH_SEQUENCE_NUM,
>         BATCH_ID
>     )
> );
> CREATE TABLE ITEMS (
>    BATCH_ID BIGINT NOT NULL,
>    ITEM_ID BIGINT NOT NULL,
>    ITEM_TYPE BIGINT,
>    ITEM_VALUE VARCHAR,
>    CONSTRAINT PK PRIMARY KEY
>    (
>         BATCH_ID,
>         ITEM_ID
>    )
> );
> CREATE TABLE COMPLETED_ITEMS (
>    ITEM_TYPE          BIGINT NOT NULL,
>    BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>    ITEM_ID            BIGINT NOT NULL,
>    ITEM_VALUE         VARCHAR,
>    CONSTRAINT PK PRIMARY KEY
>    (
>       ITEM_TYPE,
>       BATCH_SEQUENCE_NUM,  
>       ITEM_ID
>    )
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
>     SKIP-SCAN-JOIN TABLE 0
>         CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
>             SERVER FILTER BY FIRST KEY ONLY
>             SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
>         CLIENT MERGE SORT
>     DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>    SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>    FROM  ITEMS i, COMPLETED_BATCHES b
>    WHERE b.BATCH_ID = i.BATCH_ID AND          
>    b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
>     @Test
>     public void testNestedLoopJoin() throws Exception {
>         try (Connection conn = DriverManager.getConnection(getUrl())) {
>             String t1="COMPLETED_BATCHES";
>             String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
>                     "    BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
>                     "    BATCH_ID           BIGINT NOT NULL,\n" + 
>                     "    CONSTRAINT PK PRIMARY KEY\n" + 
>                     "    (\n" + 
>                     "        BATCH_SEQUENCE_NUM,\n" + 
>                     "        BATCH_ID\n" + 
>                     "    )\n" + 
>                     ")" + 
>                     "";
>             conn.createStatement().execute(ddl1);
>             
>             String t2="ITEMS";
>             String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
>                     "   BATCH_ID BIGINT NOT NULL,\n" + 
>                     "   ITEM_ID BIGINT NOT NULL,\n" + 
>                     "   ITEM_TYPE BIGINT,\n" + 
>                     "   ITEM_VALUE VARCHAR,\n" + 
>                     "   CONSTRAINT PK PRIMARY KEY\n" + 
>                     "   (\n" + 
>                     "        BATCH_ID,\n" + 
>                     "        ITEM_ID\n" + 
>                     "   )\n" + 
>                     ")";
>             conn.createStatement().execute(ddl2);
>             String t3="COMPLETED_ITEMS";
>             String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
>                     "   ITEM_TYPE          BIGINT NOT NULL,\n" + 
>                     "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
>                     "   ITEM_ID            BIGINT NOT NULL,\n" + 
>                     "   ITEM_VALUE         VARCHAR,\n" + 
>                     "   CONSTRAINT PK PRIMARY KEY\n" + 
>                     "   (\n" + 
>                     "      ITEM_TYPE,\n" + 
>                     "      BATCH_SEQUENCE_NUM,  \n" + 
>                     "      ITEM_ID\n" + 
>                     "   )\n" + 
>                     ")";
>             conn.createStatement().execute(ddl3);
>             conn.createStatement().execute("UPSERT INTO 
> "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)");
>             conn.createStatement().execute("UPSERT INTO 
> "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,4)");
>             conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, 
> item_id, item_type, item_value) VALUES (1,100, 10, 'a')");
>             conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, 
> item_id, item_type, item_value) VALUES (2,200, 20, 'a')");
>             conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, 
> item_id, item_type, item_value) VALUES (3,300, 10, 'a')");
>             conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, 
> item_id, item_type, item_value) VALUES (4,400, 20, 'a')");
>             conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, 
> item_id, item_type, item_value) VALUES (5,500, 10, 'a')");
>             conn.commit();
>             
>             conn.setAutoCommit(true);
>             String dml = "UPSERT INTO " + t3 + " (ITEM_TYPE, 
> BATCH_SEQUENCE_NUM, ITEM_ID, ITEM_VALUE)\n" + 
>                     "SELECT ITEM_TYPE, 1, ITEM_ID, ITEM_VALUE   \n" + 
>                     "FROM  " + t2 + " i\n" + 
>                     "WHERE EXISTS (" +
>                     "  SELECT 1 FROM " + t1 + " b WHERE b.BATCH_ID = 
> i.BATCH_ID AND " +
>                     "  b.BATCH_SEQUENCE_NUM > 0 AND b.BATCH_SEQUENCE_NUM < 
> 2)";
>             conn.createStatement().execute(dml);
>             ResultSet rs = conn.createStatement().executeQuery("SELECT 
> ITEM_ID FROM " + t3);
>             assertTrue(rs.next());
>             assertEquals(rs.getLong(1), 200L);
>             assertTrue(rs.next());
>             assertEquals(rs.getLong(1), 400L);
>             assertFalse(rs.next());
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to