ignite-3563 Support distributedJoins flag in JDBC driver

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/763f0676
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/763f0676
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/763f0676

Branch: refs/heads/master
Commit: 763f0676b4cef169f41f648b6e41b8bcb16203d9
Parents: fd50502
Author: agura <ag...@gridgain.com>
Authored: Mon Jul 25 12:31:24 2016 +0300
Committer: agura <ag...@gridgain.com>
Committed: Mon Jul 25 15:16:44 2016 +0300

----------------------------------------------------------------------
 .../jdbc2/JdbcDistributedJoinsQueryTest.java    | 319 +++++++++++++++++++
 .../jdbc/suite/IgniteJdbcDriverTestSuite.java   |   2 +
 .../org/apache/ignite/IgniteJdbcDriver.java     |  14 +-
 .../ignite/internal/jdbc2/JdbcConnection.java   |  12 +
 .../ignite/internal/jdbc2/JdbcQueryTask.java    |   9 +-
 .../ignite/internal/jdbc2/JdbcResultSet.java    |   4 +-
 .../ignite/internal/jdbc2/JdbcStatement.java    |   2 +-
 7 files changed, 357 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/763f0676/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDistributedJoinsQueryTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDistributedJoinsQueryTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDistributedJoinsQueryTest.java
new file mode 100644
index 0000000..53bfa73
--- /dev/null
+++ 
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDistributedJoinsQueryTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.io.Serializable;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Tests for complex queries with distributed joins enabled (joins, etc.).
+ */
+public class JdbcDistributedJoinsQueryTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** JDBC URL. */
+    private static final String BASE_URL = CFG_URL_PREFIX + 
"distributedJoins=true@modules/clients/src/test/config/jdbc-config.xml";
+
+    /** Statement. */
+    private Statement stmt;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<?,?> cache = defaultCacheConfiguration();
+
+        cache.setCacheMode(PARTITIONED);
+        cache.setBackups(1);
+        cache.setWriteSynchronizationMode(FULL_SYNC);
+        cache.setAtomicityMode(TRANSACTIONAL);
+        cache.setIndexedTypes(String.class, Organization.class, String.class, 
Person.class);
+
+        cfg.setCacheConfiguration(cache);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setConnectorConfiguration(new ConnectorConfiguration());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(3);
+
+        IgniteCache<String, Organization> orgCache = grid(0).cache(null);
+
+        assert orgCache != null;
+
+        orgCache.put("o1", new Organization(1, "A"));
+        orgCache.put("o2", new Organization(2, "B"));
+
+        IgniteCache<String, Person> personCache = grid(0).cache(null);
+
+        assert personCache != null;
+
+        personCache.put("p1", new Person(1, "John White", 25, 1));
+        personCache.put("p2", new Person(2, "Joe Black", 35, 1));
+        personCache.put("p3", new Person(3, "Mike Green", 40, 2));
+
+        Class.forName("org.apache.ignite.IgniteJdbcDriver");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stmt = DriverManager.getConnection(BASE_URL).createStatement();
+
+        assert stmt != null;
+        assert !stmt.isClosed();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        if (stmt != null) {
+            stmt.getConnection().close();
+            stmt.close();
+
+            assert stmt.isClosed();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin() throws Exception {
+        ResultSet rs = stmt.executeQuery(
+            "select p.id, p.name, o.name as orgName from Person p, 
Organization o where p.orgId = o.id");
+
+        assert rs != null;
+
+        int cnt = 0;
+
+        while (rs.next()) {
+            int id = rs.getInt("id");
+
+            if (id == 1) {
+                assert "John White".equals(rs.getString("name"));
+                assert "A".equals(rs.getString("orgName"));
+            }
+            else if (id == 2) {
+                assert "Joe Black".equals(rs.getString("name"));
+                assert "A".equals(rs.getString("orgName"));
+            }
+            else if (id == 3) {
+                assert "Mike Green".equals(rs.getString("name"));
+                assert "B".equals(rs.getString("orgName"));
+            }
+            else
+                assert false : "Wrong ID: " + id;
+
+            cnt++;
+        }
+
+        assertEquals(3, cnt);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinWithoutAlias() throws Exception {
+        ResultSet rs = stmt.executeQuery(
+            "select p.id, p.name, o.name from Person p, Organization o where 
p.orgId = o.id");
+
+        assert rs != null;
+
+        int cnt = 0;
+
+        while (rs.next()) {
+            int id = rs.getInt(1);
+
+            if (id == 1) {
+                assert "John White".equals(rs.getString("name"));
+                assert "John White".equals(rs.getString(2));
+                assert "A".equals(rs.getString(3));
+            }
+            else if (id == 2) {
+                assert "Joe Black".equals(rs.getString("name"));
+                assert "Joe Black".equals(rs.getString(2));
+                assert "A".equals(rs.getString(3));
+            }
+            else if (id == 3) {
+                assert "Mike Green".equals(rs.getString("name"));
+                assert "Mike Green".equals(rs.getString(2));
+                assert "B".equals(rs.getString(3));
+            }
+            else
+                assert false : "Wrong ID: " + id;
+
+            cnt++;
+        }
+
+        assert cnt == 3;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIn() throws Exception {
+        ResultSet rs = stmt.executeQuery("select name from Person where age in 
(25, 35)");
+
+        assert rs != null;
+
+        int cnt = 0;
+
+        while (rs.next()) {
+            assert "John White".equals(rs.getString("name")) ||
+                "Joe Black".equals(rs.getString("name"));
+
+            cnt++;
+        }
+
+        assert cnt == 2;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBetween() throws Exception {
+        ResultSet rs = stmt.executeQuery("select name from Person where age 
between 24 and 36");
+
+        assert rs != null;
+
+        int cnt = 0;
+
+        while (rs.next()) {
+            assert "John White".equals(rs.getString("name")) ||
+                "Joe Black".equals(rs.getString("name"));
+
+            cnt++;
+        }
+
+        assert cnt == 2;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCalculatedValue() throws Exception {
+        ResultSet rs = stmt.executeQuery("select age * 2 from Person");
+
+        assert rs != null;
+
+        int cnt = 0;
+
+        while (rs.next()) {
+            assert rs.getInt(1) == 50 ||
+                rs.getInt(1) == 70 ||
+                rs.getInt(1) == 80;
+
+            cnt++;
+        }
+
+        assert cnt == 3;
+    }
+
+    /**
+     * Person.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    private static class Person implements Serializable {
+        /** ID. */
+        @QuerySqlField
+        private final int id;
+
+        /** Name. */
+        @QuerySqlField(index = false)
+        private final String name;
+
+        /** Age. */
+        @QuerySqlField
+        private final int age;
+
+        /** Organization ID. */
+        @QuerySqlField(index = true)
+        private final int orgId;
+
+        /**
+         * @param id ID.
+         * @param name Name.
+         * @param age Age.
+         * @param orgId Organization ID.
+         */
+        private Person(int id, String name, int age, int orgId) {
+            assert !F.isEmpty(name);
+            assert age > 0;
+            assert orgId > 0;
+
+            this.id = id;
+            this.name = name;
+            this.age = age;
+            this.orgId = orgId;
+        }
+    }
+
+    /**
+     * Organization.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    private static class Organization implements Serializable {
+        /** ID. */
+        @QuerySqlField
+        private final int id;
+
+        /** Name. */
+        @QuerySqlField(index = false)
+        private final String name;
+
+        /**
+         * @param id ID.
+         * @param name Name.
+         */
+        private Organization(int id, String name) {
+            this.id = id;
+            this.name = name;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/763f0676/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index 603ee81..b1053b0 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.jdbc.suite;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.internal.jdbc2.JdbcDistributedJoinsQueryTest;
 import org.apache.ignite.jdbc.JdbcComplexQuerySelfTest;
 import org.apache.ignite.jdbc.JdbcConnectionSelfTest;
 import org.apache.ignite.jdbc.JdbcEmptyCacheSelfTest;
@@ -56,6 +57,7 @@ public class IgniteJdbcDriverTestSuite extends TestSuite {
         suite.addTest(new 
TestSuite(org.apache.ignite.internal.jdbc2.JdbcPreparedStatementSelfTest.class));
         suite.addTest(new 
TestSuite(org.apache.ignite.internal.jdbc2.JdbcResultSetSelfTest.class));
         suite.addTest(new 
TestSuite(org.apache.ignite.internal.jdbc2.JdbcComplexQuerySelfTest.class));
+        suite.addTest(new 
TestSuite(org.apache.ignite.internal.jdbc2.JdbcDistributedJoinsQueryTest.class));
         suite.addTest(new 
TestSuite(org.apache.ignite.internal.jdbc2.JdbcMetadataSelfTest.class));
         suite.addTest(new 
TestSuite(org.apache.ignite.internal.jdbc2.JdbcEmptyCacheSelfTest.class));
         suite.addTest(new 
TestSuite(org.apache.ignite.internal.jdbc2.JdbcLocalCachesSelfTest.class));

http://git-wip-us.apache.org/repos/asf/ignite/blob/763f0676/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
index 567ff9f..d432c1e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
@@ -104,6 +104,11 @@ import org.apache.ignite.logger.java.JavaLogger;
  *          can make significant performance and network optimizations.
  *          Default value is {@code false}.
  *     </li>
+ *     <li>
+ *         {@code distributedJoins} - enables support of distributed joins 
feature. This flag does not make sense in
+ *         combination with {@code local} and/or {@code collocated} flags with 
{@code true} value or in case of querying
+ *         of local cache. Default value is {@code false}.
+ *     </li>
  * </ul>
  *
  * <h2 class="header">Configuration of Ignite Java client based connection</h2>
@@ -284,6 +289,9 @@ public class IgniteJdbcDriver implements Driver {
     /** Collocated parameter name. */
     private static final String PARAM_COLLOCATED = "collocated";
 
+    /** Distributed joins parameter name. */
+    private static final String PARAM_DISTRIBUTED_JOINS = "distributedJoins";
+
     /** Hostname property name. */
     public static final String PROP_HOST = PROP_PREFIX + "host";
 
@@ -302,6 +310,9 @@ public class IgniteJdbcDriver implements Driver {
     /** Collocated property name. */
     public static final String PROP_COLLOCATED = PROP_PREFIX + 
PARAM_COLLOCATED;
 
+    /** Distributed joins property name. */
+    public static final String PROP_DISTRIBUTED_JOINS = PROP_PREFIX + 
PARAM_DISTRIBUTED_JOINS;
+
     /** Cache name property name. */
     public static final String PROP_CFG = PROP_PREFIX + "cfg";
 
@@ -366,7 +377,8 @@ public class IgniteJdbcDriver implements Driver {
             new PropertyInfo("Cache name", info.getProperty(PROP_CACHE), ""),
             new PropertyInfo("Node ID", info.getProperty(PROP_NODE_ID), ""),
             new PropertyInfo("Local", info.getProperty(PROP_LOCAL), ""),
-            new PropertyInfo("Collocated", info.getProperty(PROP_COLLOCATED), 
"")
+            new PropertyInfo("Collocated", info.getProperty(PROP_COLLOCATED), 
""),
+            new PropertyInfo("Distributed Joins", 
info.getProperty(PROP_DISTRIBUTED_JOINS), "")
         );
 
         if (info.getProperty(PROP_CFG) != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/763f0676/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index 2d2ce5d..6f0d9c4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -69,6 +69,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.IgniteJdbcDriver.PROP_CACHE;
 import static org.apache.ignite.IgniteJdbcDriver.PROP_CFG;
 import static org.apache.ignite.IgniteJdbcDriver.PROP_COLLOCATED;
+import static org.apache.ignite.IgniteJdbcDriver.PROP_DISTRIBUTED_JOINS;
 import static org.apache.ignite.IgniteJdbcDriver.PROP_LOCAL;
 import static org.apache.ignite.IgniteJdbcDriver.PROP_NODE_ID;
 
@@ -113,6 +114,9 @@ public class JdbcConnection implements Connection {
     /** Collocated query flag. */
     private boolean collocatedQry;
 
+    /** Distributed joins flag. */
+    private boolean distributedJoins;
+
     /** Statements. */
     final Set<JdbcStatement> statements = new HashSet<>();
 
@@ -132,6 +136,7 @@ public class JdbcConnection implements Connection {
         this.cacheName = props.getProperty(PROP_CACHE);
         this.locQry = Boolean.parseBoolean(props.getProperty(PROP_LOCAL));
         this.collocatedQry = 
Boolean.parseBoolean(props.getProperty(PROP_COLLOCATED));
+        this.distributedJoins = 
Boolean.parseBoolean(props.getProperty(PROP_DISTRIBUTED_JOINS));
 
         String nodeIdProp = props.getProperty(PROP_NODE_ID);
 
@@ -706,6 +711,13 @@ public class JdbcConnection implements Connection {
     }
 
     /**
+     * @return Distributed joins flag.
+     */
+    boolean isDistributedJoins() {
+        return distributedJoins;
+    }
+
+    /**
      * Ensures that connection is not closed.
      *
      * @throws SQLException If connection is closed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/763f0676/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
index 1a5793a..c4911cb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
@@ -92,6 +92,9 @@ class JdbcQueryTask implements 
IgniteCallable<JdbcQueryTask.QueryResult> {
     /** Collocated query flag. */
     private final boolean collocatedQry;
 
+    /** Distributed joins flag. */
+    private final boolean distributedJoins;
+
     /**
      * @param ignite Ignite.
      * @param cacheName Cache name.
@@ -102,9 +105,11 @@ class JdbcQueryTask implements 
IgniteCallable<JdbcQueryTask.QueryResult> {
      * @param uuid UUID.
      * @param locQry Local query flag.
      * @param collocatedQry Collocated query flag.
+     * @param distributedJoins Distributed joins flag.
      */
     public JdbcQueryTask(Ignite ignite, String cacheName, String sql,
-        boolean loc, Object[] args, int fetchSize, UUID uuid, boolean locQry, 
boolean collocatedQry) {
+        boolean loc, Object[] args, int fetchSize, UUID uuid,
+        boolean locQry, boolean collocatedQry, boolean distributedJoins) {
         this.ignite = ignite;
         this.args = args;
         this.uuid = uuid;
@@ -114,6 +119,7 @@ class JdbcQueryTask implements 
IgniteCallable<JdbcQueryTask.QueryResult> {
         this.loc = loc;
         this.locQry = locQry;
         this.collocatedQry = collocatedQry;
+        this.distributedJoins = distributedJoins;
     }
 
     /** {@inheritDoc} */
@@ -147,6 +153,7 @@ class JdbcQueryTask implements 
IgniteCallable<JdbcQueryTask.QueryResult> {
             qry.setPageSize(fetchSize);
             qry.setLocal(locQry);
             qry.setCollocated(collocatedQry);
+            qry.setDistributedJoins(distributedJoins);
 
             QueryCursor<List<?>> qryCursor = cache.query(qry);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/763f0676/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
index 69dddad..8e0e9d0 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
@@ -146,8 +146,8 @@ public class JdbcResultSet implements ResultSet {
 
             boolean loc = nodeId == null;
 
-            JdbcQueryTask qryTask = new JdbcQueryTask(loc ? ignite : null, 
conn.cacheName(),
-                null, loc, null, fetchSize, uuid, conn.isLocalQuery(), 
conn.isCollocatedQuery());
+            JdbcQueryTask qryTask = new JdbcQueryTask(loc ? ignite : null, 
conn.cacheName(), null, loc, null,
+                fetchSize, uuid, conn.isLocalQuery(), 
conn.isCollocatedQuery(), conn.isDistributedJoins());
 
             try {
                 JdbcQueryTask.QueryResult res =

http://git-wip-us.apache.org/repos/asf/ignite/blob/763f0676/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
index 6b806f3..e187dc0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
@@ -96,7 +96,7 @@ public class JdbcStatement implements Statement {
         boolean loc = nodeId == null;
 
         JdbcQueryTask qryTask = new JdbcQueryTask(loc ? ignite : null, 
conn.cacheName(),
-            sql, loc, args, fetchSize, uuid, conn.isLocalQuery(), 
conn.isCollocatedQuery());
+            sql, loc, args, fetchSize, uuid, conn.isLocalQuery(), 
conn.isCollocatedQuery(), conn.isDistributedJoins());
 
         try {
             JdbcQueryTask.QueryResult res =

Reply via email to