Repository: hive Updated Branches: refs/heads/master 6d936b533 -> 0918ff959
HIVE-12252 Streaming API HiveEndPoint can be created w/o partitionVals for partitioned table (Wei Zheng via Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0918ff95 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0918ff95 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0918ff95 Branch: refs/heads/master Commit: 0918ff959e6b0fd67a6b8b478290436af9532a31 Parents: 6d936b5 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Thu Nov 5 10:07:30 2015 -0800 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Thu Nov 5 10:07:30 2015 -0800 ---------------------------------------------------------------------- .../hcatalog/streaming/ConnectionError.java | 4 ++ .../hive/hcatalog/streaming/HiveEndPoint.java | 51 +++++++++++++++----- .../hive/hcatalog/streaming/TestStreaming.java | 35 +++++++++++--- 3 files changed, 71 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0918ff95/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java index 1aeef76..ffa51c9 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java @@ -20,6 +20,10 @@ package org.apache.hive.hcatalog.streaming; public class ConnectionError extends StreamingException { + public ConnectionError(String msg) { + super(msg); + } + public ConnectionError(String msg, Exception innerEx) { super(msg, innerEx); } http://git-wip-us.apache.org/repos/asf/hive/blob/0918ff95/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 306c93d..2f2d44a 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -279,23 +279,48 @@ public class HiveEndPoint { } } - private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient) throws InvalidTable { - // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table + /** + * Checks the validity of endpoint + * @param endPoint the HiveEndPoint to be checked + * @param msClient the metastore client + * @throws InvalidTable + */ + private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient) + throws InvalidTable, ConnectionError { + Table t; try { - Table t = msClient.getTable(endPoint.database, endPoint.table); - Map<String, String> params = t.getParameters(); - if(params != null) { - String transactionalProp = params.get("transactional"); - if (transactionalProp != null && transactionalProp.equalsIgnoreCase("true")) { - return; - } - } - LOG.error("'transactional' property is not set on Table " + endPoint); - throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property is not set on Table"); + t = msClient.getTable(endPoint.database, endPoint.table); } catch (Exception e) { - LOG.warn("Unable to check if Table is transactional. " + endPoint, e); + LOG.warn("Unable to check the endPoint: " + endPoint, e); throw new InvalidTable(endPoint.database, endPoint.table, e); } + + // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table + Map<String, String> params = t.getParameters(); + if (params != null) { + String transactionalProp = params.get("transactional"); + if (transactionalProp == null || !transactionalProp.equalsIgnoreCase("true")) { + LOG.error("'transactional' property is not set on Table " + endPoint); + throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property" + + " is not set on Table"); } + } + + // 2 - check if partitionvals are legitimate + if (t.getPartitionKeys() != null && !t.getPartitionKeys().isEmpty() + && endPoint.partitionVals.isEmpty()) { + // Invalid if table is partitioned, but endPoint's partitionVals is empty + String errMsg = "HiveEndPoint " + endPoint + " doesn't specify any partitions for " + + "partitioned table"; + LOG.error(errMsg); + throw new ConnectionError(errMsg); + } + if ((t.getPartitionKeys() == null || t.getPartitionKeys().isEmpty()) + && !endPoint.partitionVals.isEmpty()) { + // Invalid if table is not partitioned, but endPoint's partitionVals is not empty + String errMsg = "HiveEndPoint" + endPoint + " specifies partitions for unpartitioned table"; + LOG.error(errMsg); + throw new ConnectionError(errMsg); + } } /** http://git-wip-us.apache.org/repos/asf/hive/blob/0918ff95/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index d9a7eae..58cfbaa 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -204,7 +204,7 @@ public class TestStreaming { dropDB(msClient, dbName2); String loc2 = dbFolder.newFolder(dbName2 + ".db").toString(); - partLoc2 = createDbAndTable(driver, dbName2, tblName2, partitionVals, colNames, colTypes, bucketCols, partNames, loc2, 2); + partLoc2 = createDbAndTable(driver, dbName2, tblName2, null, colNames, colTypes, bucketCols, null, loc2, 2); String loc3 = dbFolder.newFolder("testing5.db").toString(); createStoreSales("testing5", loc3); @@ -477,15 +477,38 @@ public class TestStreaming { @Test public void testEndpointConnection() throws Exception { - // 1) Basic - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName - , partitionVals); + // For partitioned table, partitionVals are specified + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); StreamingConnection connection = endPt.newConnection(false, null); //shouldn't throw connection.close(); - // 2) Leave partition unspecified - endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null); + // For unpartitioned table, partitionVals are not specified + endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); endPt.newConnection(false, null).close(); // should not throw + + // For partitioned table, partitionVals are not specified + try { + endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null); + connection = endPt.newConnection(true); + Assert.assertTrue("ConnectionError was not thrown", false); + connection.close(); + } catch (ConnectionError e) { + // expecting this exception + String errMsg = "doesn't specify any partitions for partitioned table"; + Assert.assertTrue(e.toString().endsWith(errMsg)); + } + + // For unpartitioned table, partition values are specified + try { + endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, partitionVals); + connection = endPt.newConnection(false); + Assert.assertTrue("ConnectionError was not thrown", false); + connection.close(); + } catch (ConnectionError e) { + // expecting this exception + String errMsg = "specifies partitions for unpartitioned table"; + Assert.assertTrue(e.toString().endsWith(errMsg)); + } } @Test