Author: szita Date: Wed Aug 29 11:33:28 2018 New Revision: 1839568 URL: http://svn.apache.org/viewvc?rev=1839568&view=rev Log: PIG-5191: Pig HBase 2.0.0 support (nkollar via szita, reviewed by rohini)
Modified: pig/trunk/CHANGES.txt pig/trunk/build.xml pig/trunk/ivy.xml pig/trunk/ivy/libraries.properties pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1839568&r1=1839567&r2=1839568&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Wed Aug 29 11:33:28 2018 @@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley IMPROVEMENTS +PIG-5191: Pig HBase 2.0.0 support (nkollar via szita, reviewed by rohini) + PIG-5344: Update Apache HTTPD LogParser to latest version (nielsbasjes via szita) PIG-4092: Predicate pushdown for Parquet (nkollar via rohini) Modified: pig/trunk/build.xml URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1839568&r1=1839567&r2=1839568&view=diff ============================================================================== --- pig/trunk/build.xml (original) +++ pig/trunk/build.xml Wed Aug 29 11:33:28 2018 @@ -1713,7 +1713,7 @@ <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" conf="compile"/> <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}" - pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]" conf="spark${sparkversion}"/> + pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]" conf="spark${sparkversion},hbase${hbaseversion}"/> <ivy:cachepath pathid="compile.classpath" conf="compile"/> </target> Modified: pig/trunk/ivy.xml URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1839568&r1=1839567&r2=1839568&view=diff ============================================================================== --- pig/trunk/ivy.xml (original) +++ pig/trunk/ivy.xml Wed Aug 29 11:33:28 2018 @@ -40,6 +40,7 @@ <conf name="buildJar" extends="compile,test" visibility="private"/> <conf name="hadoop2" visibility="private"/> <conf name="hbase1" visibility="private"/> + <conf name="hbase2" visibility="private"/> <conf name="spark1" visibility="private" /> <conf name="spark2" visibility="private" /> </configurations> @@ -308,6 +309,167 @@ <exclude org="com.sun.jersey" module="jersey-json"/> <exclude org="asm" module="asm"/> </dependency> + <dependency org="com.lmax" name="disruptor" rev="3.3.0" conf="hbase1->master"/> + + <!-- HBase 2.x dependencies --> + <dependency org="org.apache.hbase" name="hbase-client" rev="${hbase2.version}" conf="hbase2->master"> + <artifact name="hbase-client" type="jar"/> + <artifact name="hbase-client" type="test-jar" ext="jar" m:classifier="tests"/> + <exclude org="org.slf4j" module="slf4j-api"/> + <exclude org="org.slf4j" module="slf4j-log4j12" /> + <exclude org="stax" module="stax-api" /> + <exclude org="javax.xml.bind" module="jaxb-api" /> + <exclude org="tomcat" module="jasper-runtime"/> + <exclude org="tomcat" module="jasper-compiler"/> + <exclude org="com.google.protobuf" module="protobuf-java"/> + <exclude org="com.sun.jersey" module="jersey-core"/> + <exclude org="com.sun.jersey" module="jersey-server"/> + <exclude org="com.sun.jersey" module="jersey-json"/> + <exclude org="asm" module="asm"/> + </dependency> + + <dependency org="org.apache.hbase" name="hbase-common" rev="${hbase2.version}" conf="hbase2->master"> + <artifact name="hbase-common" type="jar"/> + <artifact name="hbase-common" type="test-jar" ext="jar" m:classifier="tests"/> + <exclude org="org.apache.hadoop" module="hadoop-core"/> + <exclude org="stax" module="stax-api" /> + <exclude org="javax.xml.bind" module="jaxb-api" /> + <exclude org="javax.ws.rs" module="jsr311-api" /> + <exclude org="tomcat" module="jasper-runtime"/> + <exclude org="tomcat" module="jasper-compiler"/> + <exclude org="com.sun.jersey" module="jersey-core"/> + <exclude org="com.sun.jersey" module="jersey-server"/> + <exclude org="com.sun.jersey" module="jersey-json"/> + <exclude org="asm" module="asm"/> + </dependency> + + <dependency org="org.apache.hbase" name="hbase-server" rev="${hbase2.version}" conf="hbase2->master"> + <artifact name="hbase-server" type="jar"/> + <artifact name="hbase-server" type="test-jar" ext="jar" m:classifier="tests"/> + <exclude org="org.apache.hadoop" module="hadoop-core"/> + <exclude org="org.slf4j" module="slf4j-api"/> + <exclude org="org.slf4j" module="slf4j-log4j12" /> + <exclude org="stax" module="stax-api" /> + <exclude org="javax.xml.bind" module="jaxb-api" /> + <exclude org="javax.ws.rs" module="jsr311-api" /> + <exclude org="tomcat" module="jasper-runtime"/> + <exclude org="tomcat" module="jasper-compiler"/> + <exclude org="com.sun.jersey" module="jersey-core"/> + <exclude org="com.sun.jersey" module="jersey-server"/> + <exclude org="com.sun.jersey" module="jersey-json"/> + <exclude org="asm" module="asm"/> + </dependency> + + <dependency org="org.apache.hbase" name="hbase-protocol" rev="${hbase2.version}" conf="hbase2->master"> + <artifact name="hbase-protocol" type="jar"/> + <artifact name="hbase-protocol" type="test-jar" ext="jar" m:classifier="tests"/> + <exclude org="com.google.protobuf" module="protobuf-java"/> + </dependency> + + <dependency org="org.apache.hbase" name="hbase-hadoop-compat" rev="${hbase2.version}" conf="hbase2->master"> + <artifact name="hbase-hadoop-compat" type="jar"/> + <artifact name="hbase-hadoop-compat" type="test-jar" ext="jar" m:classifier="tests"/> + </dependency> + + <dependency org="org.apache.hbase" name="hbase-hadoop2-compat" rev="${hbase2.version}" conf="hbase2->master"> + <artifact name="hbase-hadoop2-compat" type="jar"/> + <artifact name="hbase-hadoop2-compat" type="test-jar" ext="jar" m:classifier="tests"/> + <exclude org="org.apache.hadoop" module="hadoop-core"/> + <exclude org="org.slf4j" module="slf4j-api"/> + <exclude org="stax" module="stax-api" /> + <exclude org="javax.xml.bind" module="jaxb-api" /> + <exclude org="tomcat" module="jasper-runtime"/> + <exclude org="tomcat" module="jasper-compiler"/> + <exclude org="com.sun.jersey" module="jersey-core"/> + <exclude org="com.sun.jersey" module="jersey-server"/> + <exclude org="com.sun.jersey" module="jersey-json"/> + <exclude org="asm" module="asm"/> + </dependency> + + <dependency org="org.apache.hbase" name="hbase-protocol-shaded" rev="${hbase2.version}" conf="hbase2->master"> + <artifact name="hbase-protocol-shaded" type="jar"/> + <artifact name="hbase-protocol-shaded" type="test-jar" ext="jar" m:classifier="tests"/> + <exclude org="com.google.protobuf" module="protobuf-java"/> + </dependency> + + <dependency org="org.apache.hbase" name="hbase-procedure" rev="${hbase2.version}" conf="hbase2->master"> + <artifact name="hbase-procedure" type="jar"/> + <artifact name="hbase-procedure" type="test-jar" ext="jar" m:classifier="tests"/> + </dependency> + + <dependency org="org.apache.hbase" name="hbase-metrics-api" rev="${hbase2.version}" conf="hbase2->master"> + <artifact name="hbase-metrics-api" type="jar"/> + <artifact name="hbase-metrics-api" type="test-jar" ext="jar" m:classifier="tests"/> + </dependency> + + <dependency org="org.apache.hbase" name="hbase-metrics" rev="${hbase2.version}" conf="hbase2->master"> + <artifact name="hbase-metrics" type="jar"/> + <artifact name="hbase-metrics" type="test-jar" ext="jar" m:classifier="tests"/> + </dependency> + + <dependency org="org.apache.hbase" name="hbase-mapreduce" rev="${hbase2.version}" conf="hbase2->master"> + <artifact name="hbase-mapreduce" type="jar"/> + <artifact name="hbase-mapreduce" type="test-jar" ext="jar" m:classifier="tests"/> + <exclude org="org.apache.hadoop" module="hadoop-core"/> + <exclude org="org.slf4j" module="slf4j-api"/> + <exclude org="org.slf4j" module="slf4j-log4j12" /> + <exclude org="stax" module="stax-api" /> + <exclude org="javax.xml.bind" module="jaxb-api" /> + <exclude org="javax.ws.rs" module="jsr311-api" /> + <exclude org="tomcat" module="jasper-runtime"/> + <exclude org="tomcat" module="jasper-compiler"/> + <exclude org="com.sun.jersey" module="jersey-core"/> + <exclude org="com.sun.jersey" module="jersey-server"/> + <exclude org="com.sun.jersey" module="jersey-json"/> + <exclude org="asm" module="asm"/> + </dependency> + + <dependency org="org.apache.hbase" name="hbase-zookeeper" rev="${hbase2.version}" conf="hbase2->master"> + <artifact name="hbase-zookeeper" type="jar"/> + <artifact name="hbase-zookeeper" type="test-jar" ext="jar" m:classifier="tests"/> + <exclude org="org.apache.hadoop" module="hadoop-core"/> + <exclude org="stax" module="stax-api" /> + <exclude org="javax.xml.bind" module="jaxb-api" /> + <exclude org="javax.ws.rs" module="jsr311-api" /> + <exclude org="tomcat" module="jasper-runtime"/> + <exclude org="tomcat" module="jasper-compiler"/> + <exclude org="com.sun.jersey" module="jersey-core"/> + <exclude org="com.sun.jersey" module="jersey-server"/> + <exclude org="com.sun.jersey" module="jersey-json"/> + <exclude org="asm" module="asm"/> + </dependency> + + <dependency org="org.apache.htrace" name="htrace-core4" rev="${htrace4.version}" conf="hbase2->master"> + <artifact name="htrace-core4" type="jar"/> + </dependency> + + <dependency org="org.apache.hbase" name="hbase-replication" rev="${hbase2.version}" conf="hbase2->master"> + <artifact name="hbase-replication" type="jar"/> + <artifact name="hbase-replication" type="test-jar" ext="jar" m:classifier="tests"/> + </dependency> + + <dependency org="org.apache.hbase" name="hbase-http" rev="${hbase2.version}" conf="hbase2->master"> + <artifact name="hbase-http" type="jar"/> + <artifact name="hbase-http" type="test-jar" ext="jar" m:classifier="tests"/> + </dependency> + + <dependency org="org.apache.hbase.thirdparty" name="hbase-shaded-miscellaneous" rev="2.1.0" conf="hbase2->master" /> + + <dependency org="org.apache.hbase.thirdparty" name="hbase-shaded-netty" rev="2.1.0" conf="hbase2->master" /> + + <dependency org="org.apache.hbase.thirdparty" name="hbase-shaded-protobuf" rev="2.1.0" conf="hbase2->master" /> + + <dependency org="org.eclipse.jetty" name="jetty-http" rev="9.3.20.v20170531" conf="hbase2->master"/> + <dependency org="org.eclipse.jetty" name="jetty-io" rev="9.3.20.v20170531" conf="hbase2->master"/> + <dependency org="org.eclipse.jetty" name="jetty-security" rev="9.3.20.v20170531" conf="hbase2->master"/> + <dependency org="org.eclipse.jetty" name="jetty-server" rev="9.3.20.v20170531" conf="hbase2->master"/> + <dependency org="org.eclipse.jetty" name="jetty-servlet" rev="9.3.20.v20170531" conf="hbase2->master"/> + <dependency org="org.eclipse.jetty" name="jetty-util" rev="9.3.20.v20170531" conf="hbase2->master"/> + <dependency org="org.eclipse.jetty" name="jetty-util-ajax" rev="9.3.20.v20170531" conf="hbase2->master"/> + <dependency org="org.eclipse.jetty" name="jetty-webapp" rev="9.3.20.v20170531" conf="hbase2->master"/> + <dependency org="org.eclipse.jetty" name="jetty-xml" rev="9.3.20.v20170531" conf="hbase2->master"/> + <dependency org="com.lmax" name="disruptor" rev="3.3.6" conf="hbase2->master"/> + <!-- End of HBase dependencies --> <dependency org="org.htrace" name="htrace-core" rev="3.0.4" conf="hadoop2->master"/> <dependency org="org.apache.htrace" name="htrace-core" rev="${htrace.version}" conf="hadoop2->master"/> @@ -316,7 +478,6 @@ <dependency org="org.cloudera.htrace" name="htrace-core" rev="2.00" conf="hbase1->master"> <artifact name="htrace-core" type="jar"/> </dependency> - <dependency org="com.lmax" name="disruptor" rev="3.3.0" conf="hbase1->master"/> <!-- for TestHBaseStorage --> <dependency org="org.apache.hbase" name="hbase-procedure" rev="${hbase1.version}" conf="test->master"/> @@ -420,6 +581,7 @@ <dependency org="asm" name="asm" rev="${asm.version}" conf="compile->master"/> <dependency org="javax.servlet" name="javax.servlet-api" rev="3.0.1" conf="spark1->default;spark2->default"/> + <dependency org="javax.servlet" name="javax.servlet-api" rev="3.1.0" conf="test->master"/> <dependency org="org.scala-lang.modules" name="scala-xml_2.11" rev="${scala-xml.version}" conf="spark1->default;spark2->default"/> <!-- for Tez integration --> Modified: pig/trunk/ivy/libraries.properties URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1839568&r1=1839567&r2=1839568&view=diff ============================================================================== --- pig/trunk/ivy/libraries.properties (original) +++ pig/trunk/ivy/libraries.properties Wed Aug 29 11:33:28 2018 @@ -39,6 +39,7 @@ hadoop-common.version=2.7.3 hadoop-hdfs.version=2.7.3 hadoop-mapreduce.version=2.7.3 hbase1.version=1.2.4 +hbase2.version=2.0.0 hsqldb.version=2.4.0 hive.version=1.2.1 httpcomponents.version=4.4 @@ -64,7 +65,7 @@ antlr.version=3.4 stringtemplate.version=4.0.4 log4j.version=1.2.16 netty.version=3.6.6.Final -netty-all.version=4.0.23.Final +netty-all.version=4.1.1.Final rats-lib.version=0.5.1 slf4j-api.version=1.6.1 slf4j-log4j12.version=1.6.1 @@ -92,6 +93,7 @@ snappy.version=0.2 leveldbjni.version=1.8 curator.version=2.6.0 htrace.version=3.1.0-incubating +htrace4.version=4.0.1-incubating commons-lang3.version=3.6 scala-xml.version=1.0.5 glassfish.el.version=3.0.1-b08 \ No newline at end of file Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1839568&r1=1839567&r2=1839568&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Wed Aug 29 11:33:28 2018 @@ -45,7 +45,10 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -64,6 +67,7 @@ import org.apache.hadoop.hbase.mapreduce import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableSplit; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableComparable; @@ -868,16 +872,22 @@ public class HBaseStorage extends LoadFu private void addHBaseDelegationToken(Configuration hbaseConf, Job job) { if (!UDFContext.getUDFContext().isFrontend()) { + LOG.debug("skipping authentication checks because we're currently in a frontend UDF context"); return; } if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) { + LOG.info("hbase is configured to use Kerberos, attempting to fetch delegation token."); try { - UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); - if (currentUser.hasKerberosCredentials()) { - TokenUtil.obtainTokenForJob(hbaseConf,currentUser,job); + User currentUser = User.getCurrent(); + UserGroupInformation currentUserGroupInformation = currentUser.getUGI(); + if (currentUserGroupInformation.hasKerberosCredentials()) { + try (Connection connection = ConnectionFactory.createConnection(hbaseConf, currentUser)) { + TokenUtil.obtainTokenForJob(connection, currentUser, job); + LOG.info("Token retrieval succeeded for user " + currentUser.getName()); + } } else { - LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available"); + LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available for user " + currentUser.getName()); } } catch (RuntimeException re) { throw re; @@ -885,6 +895,8 @@ public class HBaseStorage extends LoadFu throw new UndeclaredThrowableException(e, "Unexpected error calling TokenUtil.obtainTokenForJob()"); } + } else { + LOG.info("hbase is not configured to use kerberos, skipping delegation token"); } } @@ -996,7 +1008,7 @@ public class HBaseStorage extends LoadFu } if (!columnInfo.isColumnMap()) { - put.add(columnInfo.getColumnFamily(), columnInfo.getColumnName(), + put.addColumn(columnInfo.getColumnFamily(), columnInfo.getColumnName(), ts, objToBytes(t.get(i), (fieldSchemas == null) ? DataType.findType(t.get(i)) : fieldSchemas[i].getType())); } else { @@ -1009,7 +1021,7 @@ public class HBaseStorage extends LoadFu } // TODO deal with the fact that maps can have types now. Currently we detect types at // runtime in the case of storing to a cf, which is suboptimal. - put.add(columnInfo.getColumnFamily(), Bytes.toBytes(colName.toString()), ts, + put.addColumn(columnInfo.getColumnFamily(), Bytes.toBytes(colName.toString()), ts, objToBytes(cfMap.get(colName), DataType.findType(cfMap.get(colName)))); } } @@ -1039,7 +1051,7 @@ public class HBaseStorage extends LoadFu delete.setTimestamp(timestamp); if(noWAL_) { - delete.setWriteToWAL(false); + delete.setDurability(Durability.SKIP_WAL); } return delete; @@ -1058,7 +1070,7 @@ public class HBaseStorage extends LoadFu Put put = new Put(objToBytes(key, type)); if(noWAL_) { - put.setWriteToWAL(false); + put.setDurability(Durability.SKIP_WAL); } return put; Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java?rev=1839568&r1=1839567&r2=1839568&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseTableInputFormat.java Wed Aug 29 11:33:28 2018 @@ -101,7 +101,7 @@ public class HBaseTableInputFormat exten return splits; } - private boolean skipRegion(CompareOp op, byte[] key, byte[] option ) { + private boolean skipRegion(CompareOp op, byte[] key, byte[] option ) throws IOException { if (key.length == 0 || option == null) return false; Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1839568&r1=1839567&r2=1839568&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original) +++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Wed Aug 29 11:33:28 2018 @@ -18,6 +18,7 @@ package org.apache.pig.test; import java.io.File; import java.io.IOException; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -29,14 +30,20 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; @@ -56,7 +63,10 @@ import org.junit.Test; import com.google.common.collect.Lists; +import static org.junit.Assert.assertTrue; + public class TestHBaseStorage { + private static Connection connection; private static final Log LOG = LogFactory.getLog(TestHBaseStorage.class); private static HBaseTestingUtility util; @@ -84,17 +94,25 @@ public class TestHBaseStorage { @BeforeClass public static void setUp() throws Exception { // This is needed by Pig + Configuration hadoopConf = new Configuration(); + hadoopConf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, Paths.get(Util.getTestDirectory(TestLoad.class)).toAbsolutePath().toString()); + conf = HBaseConfiguration.create(new Configuration()); + // Setting this property is required due to a bug in HBase 2.0 + // will be fixed in 2.0.1, see HBASE-20544. It doesn't have any effect on HBase 1.x + conf.set("hbase.localcluster.assign.random.ports", "true"); util = new HBaseTestingUtility(conf); util.startMiniZKCluster(); util.startMiniHBaseCluster(1, 1); + connection = ConnectionFactory.createConnection(conf); } @AfterClass public static void oneTimeTearDown() throws Exception { // In HBase 0.90.1 and above we can use util.shutdownMiniHBaseCluster() // here instead. + connection.close(); MiniHBaseCluster hbc = util.getHBaseCluster(); if (hbc != null) { hbc.shutdown(); @@ -113,17 +131,17 @@ public class TestHBaseStorage { public void tearDown() throws Exception { try { deleteAllRows(TESTTABLE_1); - } catch (IOException e) {} + } catch (Exception e) {} try { deleteAllRows(TESTTABLE_2); - } catch (IOException e) {} + } catch (Exception e) {} pig.shutdown(); } // DVR: I've found that it is faster to delete all rows in small tables // than to drop them. private void deleteAllRows(String tableName) throws Exception { - HTable table = new HTable(conf, tableName); + Table table = connection.getTable(TableName.valueOf(tableName)); ResultScanner scanner = table.getScanner(new Scan()); List<Delete> deletes = Lists.newArrayList(); for (Result row : scanner) { @@ -194,7 +212,7 @@ public class TestHBaseStorage { public void testLoadWithSpecifiedTimestampAndRanges() throws IOException { long beforeTimeStamp = System.currentTimeMillis() - 10; - HTable table = prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText); + Table table = prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText); long afterTimeStamp = System.currentTimeMillis() + 10; @@ -216,9 +234,9 @@ public class TestHBaseStorage { Assert.assertEquals("Timestamp is set after rows added", 0, queryWithTimestamp(null, null, afterTimeStamp)); - long specifiedTimestamp = table.get(new Get(Bytes.toBytes("00"))).getColumnLatest(COLUMNFAMILY, Bytes.toBytes("col_a")).getTimestamp(); + long specifiedTimestamp = table.get(new Get(Bytes.toBytes("00"))).getColumnLatestCell(COLUMNFAMILY, Bytes.toBytes("col_a")).getTimestamp(); - Assert.assertTrue("Timestamp is set equals to row 01", queryWithTimestamp(null, null, specifiedTimestamp) > 0); + assertTrue("Timestamp is set equals to row 01", queryWithTimestamp(null, null, specifiedTimestamp) > 0); LOG.info("LoadFromHBase done"); @@ -312,7 +330,7 @@ public class TestHBaseStorage { } /** - * Test Load from hbase with map parameters and multiple column prefixs + * Test Load from hbase with map parameters and multiple column prefixes * */ @Test @@ -1015,7 +1033,7 @@ public class TestHBaseStorage { "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + "','-caster HBaseBinaryConverter')"); - HTable table = new HTable(conf, TESTTABLE_2); + Table table = connection.getTable(TableName.valueOf(TESTTABLE_2)); ResultScanner scanner = table.getScanner(new Scan()); Iterator<Result> iter = scanner.iterator(); int i = 0; @@ -1057,7 +1075,7 @@ public class TestHBaseStorage { + TESTCOLUMN_A + " " + TESTCOLUMN_B + "','-caster HBaseBinaryConverter')"); - HTable table = new HTable(conf, TESTTABLE_2); + Table table = connection.getTable(TableName.valueOf(TESTTABLE_2)); ResultScanner scanner = table.getScanner(new Scan()); Iterator<Result> iter = scanner.iterator(); int i = 0; @@ -1094,7 +1112,7 @@ public class TestHBaseStorage { + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTimestamp true')"); - HTable table = new HTable(conf, TESTTABLE_2); + Table table = connection.getTable(TableName.valueOf(TESTTABLE_2)); ResultScanner scanner = table.getScanner(new Scan()); Iterator<Result> iter = scanner.iterator(); int i = 0; @@ -1141,7 +1159,7 @@ public class TestHBaseStorage { + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTimestamp true')"); - HTable table = new HTable(conf, TESTTABLE_2); + Table table = connection.getTable(TableName.valueOf(TESTTABLE_2)); ResultScanner scanner = table.getScanner(new Scan()); Iterator<Result> iter = scanner.iterator(); int i = 0; @@ -1188,7 +1206,7 @@ public class TestHBaseStorage { + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTimestamp true')"); - HTable table = new HTable(conf, TESTTABLE_2); + Table table = connection.getTable(TableName.valueOf(TESTTABLE_2)); ResultScanner scanner = table.getScanner(new Scan()); Iterator<Result> iter = scanner.iterator(); int i = 0; @@ -1233,7 +1251,7 @@ public class TestHBaseStorage { + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + "','-caster HBaseBinaryConverter -includeTombstone true')"); - HTable table = new HTable(conf, TESTTABLE_1); + Table table = connection.getTable(TableName.valueOf(TESTTABLE_1)); ResultScanner scanner = table.getScanner(new Scan()); Iterator<Result> iter = scanner.iterator(); int count = 0; @@ -1276,7 +1294,7 @@ public class TestHBaseStorage { + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + "')"); - HTable table = new HTable(conf, TESTTABLE_2); + Table table = connection.getTable(TableName.valueOf(TESTTABLE_2)); ResultScanner scanner = table.getScanner(new Scan()); Iterator<Result> iter = scanner.iterator(); int i = 0; @@ -1321,8 +1339,8 @@ public class TestHBaseStorage { Assert.assertEquals(put.getClass().getMethod("getDurability").invoke(put), skipWal); Assert.assertEquals(delete.getClass().getMethod("getDurability").invoke(delete), skipWal); } else { - Assert.assertFalse(put.getWriteToWAL()); - Assert.assertFalse(delete.getWriteToWAL()); + Assert.assertEquals(Durability.SKIP_WAL, put.getDurability()); + Assert.assertEquals(Durability.SKIP_WAL, delete.getDurability()); } } @@ -1350,8 +1368,8 @@ public class TestHBaseStorage { Assert.assertNotEquals(put.getClass().getMethod("getDurability").invoke(put), skipWal); Assert.assertNotEquals(delete.getClass().getMethod("getDurability").invoke(delete), skipWal); } else { - Assert.assertTrue(put.getWriteToWAL()); - Assert.assertTrue(delete.getWriteToWAL()); + Assert.assertEquals(Durability.SKIP_WAL, put.getDurability()); + Assert.assertEquals(Durability.SKIP_WAL, delete.getDurability()); } } @@ -1371,7 +1389,7 @@ public class TestHBaseStorage { "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + TESTCOLUMN_A + " " + TESTCOLUMN_B + "')"); - HTable table = new HTable(conf, TESTTABLE_2); + Table table = connection.getTable(TableName.valueOf(TESTTABLE_2)); ResultScanner scanner = table.getScanner(new Scan()); Iterator<Result> iter = scanner.iterator(); int i = 0; @@ -1406,7 +1424,7 @@ public class TestHBaseStorage { "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + TESTCOLUMN_A + " " + TESTCOLUMN_B + "')"); - HTable table = new HTable(conf, TESTTABLE_2); + Table table = connection.getTable(TableName.valueOf(TESTTABLE_2)); ResultScanner scanner = table.getScanner(new Scan()); Iterator<Result> iter = scanner.iterator(); int i = 0; @@ -1465,7 +1483,7 @@ public class TestHBaseStorage { // See PIG-4151 public void testStoreEmptyMap() throws IOException { String tableName = "emptyMapTest"; - HTable table; + Table table; try { deleteAllRows(tableName); } catch (Exception e) { @@ -1475,10 +1493,10 @@ public class TestHBaseStorage { cfs[0] = Bytes.toBytes("info"); cfs[1] = Bytes.toBytes("friends"); try { - table = util.createTable(Bytes.toBytesBinary(tableName), + table = util.createTable(TableName.valueOf(tableName), cfs); } catch (Exception e) { - table = new HTable(conf, Bytes.toBytesBinary(tableName)); + table = connection.getTable(TableName.valueOf(tableName)); } File inputFile = Util.createInputFile("test", "tmp", new String[] {"row1;Homer;Morrison;[1#Silvia,2#Stacy]", @@ -1518,7 +1536,7 @@ public class TestHBaseStorage { + "') as (rowKey:chararray,col_a:int, col_b:double, col_c:chararray);"); } - private HTable prepareTable(String tableName, boolean initData, + private Table prepareTable(String tableName, boolean initData, DataFormat format) throws IOException { return prepareTable(tableName, initData, format, TableType.ONE_CF); } @@ -1526,30 +1544,30 @@ public class TestHBaseStorage { * Prepare a table in hbase for testing. * */ - private HTable prepareTable(String tableName, boolean initData, + private Table prepareTable(String tableName, boolean initData, DataFormat format, TableType type) throws IOException { // define the table schema - HTable table = null; + Table table = null; try { if (lastTableType == type) { deleteAllRows(tableName); } else { - util.deleteTable(tableName); + util.deleteTable(TableName.valueOf(tableName)); } } catch (Exception e) { // It's ok, table might not exist. } try { if (type == TableType.TWO_CF) { - table = util.createTable(Bytes.toBytesBinary(tableName), + table = util.createTable(TableName.valueOf(tableName), new byte[][]{COLUMNFAMILY, COLUMNFAMILY2}); } else { - table = util.createTable(Bytes.toBytesBinary(tableName), + table = util.createTable(TableName.valueOf(tableName), COLUMNFAMILY); } lastTableType = type; } catch (Exception e) { - table = new HTable(conf, Bytes.toBytesBinary(tableName)); + table = connection.getTable(TableName.valueOf(tableName)); } if (initData) { @@ -1560,23 +1578,23 @@ public class TestHBaseStorage { Put put = new Put(Bytes.toBytes("00".substring(v.length()) + v)); // sc: int type - put.add(COLUMNFAMILY, Bytes.toBytes("sc"), + put.addColumn(COLUMNFAMILY, Bytes.toBytes("sc"), Bytes.toBytes(i)); // col_a: int type - put.add(COLUMNFAMILY, Bytes.toBytes("col_a"), + put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_a"), Bytes.toBytes(i)); // col_b: double type - put.add(COLUMNFAMILY, Bytes.toBytes("col_b"), + put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_b"), Bytes.toBytes(i + 0.0)); // col_c: string type - put.add(COLUMNFAMILY, Bytes.toBytes("col_c"), + put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_c"), Bytes.toBytes("Text_" + i)); // prefixed_col_d: string type - put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"), + put.addColumn(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"), Bytes.toBytes("PrefixedText_" + i)); // another cf if (type == TableType.TWO_CF) { - put.add(COLUMNFAMILY2, Bytes.toBytes("col_x"), + put.addColumn(COLUMNFAMILY2, Bytes.toBytes("col_x"), Bytes.toBytes(i)); } table.put(put); @@ -1585,29 +1603,30 @@ public class TestHBaseStorage { Put put = new Put( ("00".substring(v.length()) + v).getBytes()); // sc: int type - put.add(COLUMNFAMILY, Bytes.toBytes("sc"), + put.addColumn(COLUMNFAMILY, Bytes.toBytes("sc"), (i + "").getBytes()); // int // col_a: int type - put.add(COLUMNFAMILY, Bytes.toBytes("col_a"), + put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_a"), (i + "").getBytes()); // int // col_b: double type - put.add(COLUMNFAMILY, Bytes.toBytes("col_b"), + put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_b"), ((i + 0.0) + "").getBytes()); // col_c: string type - put.add(COLUMNFAMILY, Bytes.toBytes("col_c"), + put.addColumn(COLUMNFAMILY, Bytes.toBytes("col_c"), ("Text_" + i).getBytes()); // prefixed_col_d: string type - put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"), + put.addColumn(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"), ("PrefixedText_" + i).getBytes()); // another cf if (type == TableType.TWO_CF) { - put.add(COLUMNFAMILY2, Bytes.toBytes("col_x"), + put.addColumn(COLUMNFAMILY2, Bytes.toBytes("col_x"), (i + "").getBytes()); } table.put(put); } } - table.flushCommits(); + BufferedMutator bm = connection.getBufferedMutator(table.getName()); + bm.flush(); } return table; } @@ -1632,7 +1651,7 @@ public class TestHBaseStorage { */ private static long getColTimestamp(Result result, String colName) { byte[][] colArray = Bytes.toByteArrays(colName.split(":")); - return result.getColumnLatest(colArray[0], colArray[1]).getTimestamp(); + return result.getColumnLatestCell(colArray[0], colArray[1]).getTimestamp(); } } Modified: pig/trunk/test/org/apache/pig/test/TestJobSubmission.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=1839568&r1=1839567&r2=1839568&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original) +++ pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Wed Aug 29 11:33:28 2018 @@ -26,6 +26,7 @@ import java.net.URI; import java.util.Iterator; import java.util.Random; +import org.apache.hadoop.hbase.TableName; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -192,6 +193,9 @@ abstract public class TestJobSubmission Util.isMapredExecType(cluster.getExecType())); // use the estimation Configuration conf = HBaseConfiguration.create(new Configuration()); + // Setting this property is required due to a bug in HBase 2.0 + // will be fixed in 2.0.1, see HBASE-20544. It doesn't have any effect on HBase 1.x + conf.set("hbase.localcluster.assign.random.ports", "true"); HBaseTestingUtility util = new HBaseTestingUtility(conf); int clientPort = util.startMiniZKCluster().getClientPort(); util.startMiniHBaseCluster(1, 1); @@ -233,7 +237,7 @@ abstract public class TestJobSubmission Util.assertParallelValues(-1, 2, -1, 2, job.getJobConf()); final byte[] COLUMNFAMILY = Bytes.toBytes("pig"); - util.createTable(Bytes.toBytesBinary("test_table"), COLUMNFAMILY); + util.createTable(TableName.valueOf("test_table"), COLUMNFAMILY); // the estimation won't take effect when it apply to non-dfs or the files doesn't exist, such as hbase query = "a = load 'hbase://test_table' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('c:f1 c:f2');" + @@ -253,7 +257,7 @@ abstract public class TestJobSubmission Util.assertParallelValues(-1, -1, 1, 1, job.getJobConf()); - util.deleteTable(Bytes.toBytesBinary("test_table")); + util.deleteTable(TableName.valueOf("test_table")); // In HBase 0.90.1 and above we can use util.shutdownMiniHBaseCluster() // here instead. MiniHBaseCluster hbc = util.getHBaseCluster();