Repository: hive
Updated Branches:
  refs/heads/master 6d936b533 -> 0918ff959


HIVE-12252 Streaming API HiveEndPoint can be created w/o partitionVals for 
partitioned table (Wei Zheng via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0918ff95
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0918ff95
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0918ff95

Branch: refs/heads/master
Commit: 0918ff959e6b0fd67a6b8b478290436af9532a31
Parents: 6d936b5
Author: Eugene Koifman <ekoif...@hortonworks.com>
Authored: Thu Nov 5 10:07:30 2015 -0800
Committer: Eugene Koifman <ekoif...@hortonworks.com>
Committed: Thu Nov 5 10:07:30 2015 -0800

----------------------------------------------------------------------
 .../hcatalog/streaming/ConnectionError.java     |  4 ++
 .../hive/hcatalog/streaming/HiveEndPoint.java   | 51 +++++++++++++++-----
 .../hive/hcatalog/streaming/TestStreaming.java  | 35 +++++++++++---
 3 files changed, 71 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0918ff95/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
index 1aeef76..ffa51c9 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
@@ -20,6 +20,10 @@ package org.apache.hive.hcatalog.streaming;
 
 public class ConnectionError extends StreamingException {
 
+  public ConnectionError(String msg) {
+    super(msg);
+  }
+
   public ConnectionError(String msg, Exception innerEx) {
     super(msg, innerEx);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0918ff95/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 306c93d..2f2d44a 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -279,23 +279,48 @@ public class HiveEndPoint {
       }
     }
 
-    private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient 
msClient) throws InvalidTable {
-      // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table
+    /**
+     * Checks the validity of endpoint
+     * @param endPoint the HiveEndPoint to be checked
+     * @param msClient the metastore client
+     * @throws InvalidTable
+     */
+    private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient 
msClient)
+        throws InvalidTable, ConnectionError {
+      Table t;
       try {
-        Table t = msClient.getTable(endPoint.database, endPoint.table);
-        Map<String, String> params = t.getParameters();
-        if(params != null) {
-          String transactionalProp = params.get("transactional");
-          if (transactionalProp != null && 
transactionalProp.equalsIgnoreCase("true")) {
-            return;
-          }
-        }
-        LOG.error("'transactional' property is not set on Table " + endPoint);
-        throw new InvalidTable(endPoint.database, endPoint.table, 
"\'transactional\' property is not set on Table");
+        t = msClient.getTable(endPoint.database, endPoint.table);
       } catch (Exception e) {
-        LOG.warn("Unable to check if Table is transactional. " + endPoint, e);
+        LOG.warn("Unable to check the endPoint: " + endPoint, e);
         throw new InvalidTable(endPoint.database, endPoint.table, e);
       }
+
+      // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table
+      Map<String, String> params = t.getParameters();
+      if (params != null) {
+        String transactionalProp = params.get("transactional");
+        if (transactionalProp == null || 
!transactionalProp.equalsIgnoreCase("true")) {
+          LOG.error("'transactional' property is not set on Table " + 
endPoint);
+          throw new InvalidTable(endPoint.database, endPoint.table, 
"\'transactional\' property" +
+              " is not set on Table");          }
+      }
+
+      // 2 - check if partitionvals are legitimate
+      if (t.getPartitionKeys() != null && !t.getPartitionKeys().isEmpty()
+          && endPoint.partitionVals.isEmpty()) {
+        // Invalid if table is partitioned, but endPoint's partitionVals is 
empty
+        String errMsg = "HiveEndPoint " + endPoint + " doesn't specify any 
partitions for " +
+            "partitioned table";
+        LOG.error(errMsg);
+        throw new ConnectionError(errMsg);
+      }
+      if ((t.getPartitionKeys() == null || t.getPartitionKeys().isEmpty())
+          && !endPoint.partitionVals.isEmpty()) {
+        // Invalid if table is not partitioned, but endPoint's partitionVals 
is not empty
+        String errMsg = "HiveEndPoint" + endPoint + " specifies partitions for 
unpartitioned table";
+        LOG.error(errMsg);
+        throw new ConnectionError(errMsg);
+      }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hive/blob/0918ff95/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index d9a7eae..58cfbaa 100644
--- 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -204,7 +204,7 @@ public class TestStreaming {
 
     dropDB(msClient, dbName2);
     String loc2 = dbFolder.newFolder(dbName2 + ".db").toString();
-    partLoc2 = createDbAndTable(driver, dbName2, tblName2, partitionVals, 
colNames, colTypes, bucketCols, partNames, loc2, 2);
+    partLoc2 = createDbAndTable(driver, dbName2, tblName2, null, colNames, 
colTypes, bucketCols, null, loc2, 2);
 
     String loc3 = dbFolder.newFolder("testing5.db").toString();
     createStoreSales("testing5", loc3);
@@ -477,15 +477,38 @@ public class TestStreaming {
 
   @Test
   public void testEndpointConnection() throws Exception {
-    // 1) Basic
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
-            , partitionVals);
+    // For partitioned table, partitionVals are specified
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, 
partitionVals);
     StreamingConnection connection = endPt.newConnection(false, null); 
//shouldn't throw
     connection.close();
 
-    // 2) Leave partition unspecified
-    endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
+    // For unpartitioned table, partitionVals are not specified
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
     endPt.newConnection(false, null).close(); // should not throw
+
+    // For partitioned table, partitionVals are not specified
+    try {
+      endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
+      connection = endPt.newConnection(true);
+      Assert.assertTrue("ConnectionError was not thrown", false);
+      connection.close();
+    } catch (ConnectionError e) {
+      // expecting this exception
+      String errMsg = "doesn't specify any partitions for partitioned table";
+      Assert.assertTrue(e.toString().endsWith(errMsg));
+    }
+
+    // For unpartitioned table, partition values are specified
+    try {
+      endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, partitionVals);
+      connection = endPt.newConnection(false);
+      Assert.assertTrue("ConnectionError was not thrown", false);
+      connection.close();
+    } catch (ConnectionError e) {
+      // expecting this exception
+      String errMsg = "specifies partitions for unpartitioned table";
+      Assert.assertTrue(e.toString().endsWith(errMsg));
+    }
   }
 
   @Test

Reply via email to