Revert "HIVE-17606 : Improve security for DB notification related APIs (Tao Li via Thejas Nair)"
This reverts commit 4142c98c18c6ae2aee81abbac00591a27f55e425. Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6983d595 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6983d595 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6983d595 Branch: refs/heads/hive-14535 Commit: 6983d595d957fd25b14bc10871b1c4a6f6867c85 Parents: e0484b7 Author: Thejas M Nair <the...@hortonworks.com> Authored: Thu Sep 28 15:35:11 2017 -0700 Committer: Thejas M Nair <the...@hortonworks.com> Committed: Thu Sep 28 15:35:22 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 -- .../hive/hcatalog/api/TestHCatClient.java | 7 --- .../hadoop/hive/ql/parse/TestCopyUtils.java | 8 +-- .../hadoop/hive/ql/parse/TestExportImport.java | 2 - .../hive/ql/parse/TestReplicationScenarios.java | 39 --------------- ...TestReplicationScenariosAcrossInstances.java | 2 - .../hadoop/hive/metastore/HiveMetaStore.java | 41 --------------- .../hadoop/hive/metastore/MetaStoreUtils.java | 26 ---------- .../hadoop/hive/ql/exec/ReplCopyTask.java | 32 ++++++------ .../hadoop/hive/ql/parse/repl/CopyUtils.java | 34 +++---------- .../hive/metastore/ReplChangeManager.java | 52 +++----------------- 11 files changed, 32 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/6983d595/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5bec15e..d94ff85 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -794,9 +794,6 @@ public class HiveConf extends Configuration { METASTORE_EVENT_DB_LISTENER_TTL("hive.metastore.event.db.listener.timetolive", "86400s", new TimeValidator(TimeUnit.SECONDS), "time after which events will be removed from the database listener queue"), - METASTORE_EVENT_DB_NOTIFICATION_API_AUTH("hive.metastore.event.db.notification.api.auth", true, - "Should metastore do authorization against database notification related APIs such as get_next_notification.\n" + - "If set to true, then only the superusers in proxy settings have the permission"), METASTORE_AUTHORIZATION_STORAGE_AUTH_CHECKS("hive.metastore.authorization.storage.checks", false, "Should the metastore do authorization checks against the underlying storage (usually hdfs) \n" + "for operations like drop-partition (disallow the drop-partition if the user in\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/6983d595/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java index 14c9a4b..d2474cc 100644 --- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java @@ -49,9 +49,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; -import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hive.hcatalog.api.repl.Command; import org.apache.hive.hcatalog.api.repl.ReplicationTask; import org.apache.hive.hcatalog.api.repl.ReplicationUtils; @@ -111,11 +109,6 @@ public class TestHCatClient { return; } - // Set proxy user privilege and initialize the global state of ProxyUsers - Configuration conf = new Configuration(); - conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); - ProxyUsers.refreshSuperUserGroupsConfiguration(conf); - System.setProperty(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname, DbNotificationListener.class.getName()); // turn on db notification listener on metastore msPort = MetaStoreTestUtils.startMetaStore(); http://git-wip-us.apache.org/repos/asf/hive/blob/6983d595/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java index f14b430..98b2a3c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java @@ -81,12 +81,12 @@ public class TestCopyUtils { Configuration conf = new Configuration(); conf.set("dfs.client.use.datanode.hostname", "true"); - UserGroupInformation ugi = Utils.getUGI(); - final String currentUser = ugi.getShortUserName(); - conf.set("hadoop.proxyuser." + currentUser + ".hosts", "*"); - MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + + UserGroupInformation ugi = Utils.getUGI(); + String currentUser = ugi.getShortUserName(); + HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{ put(ConfVars.HIVE_IN_TEST.varname, "false"); put(ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname, "1"); http://git-wip-us.apache.org/repos/asf/hive/blob/6983d595/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java index e86ee5e..70a57f8 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.parse; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.shims.Utils; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -49,7 +48,6 @@ public class TestExportImport { public static void classLevelSetup() throws Exception { Configuration conf = new Configuration(); conf.set("dfs.client.use.datanode.hostname", "true"); - conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{ http://git-wip-us.apache.org/repos/asf/hive/blob/6983d595/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 7cf1498..a8c3a0b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -53,8 +53,6 @@ import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule; import org.apache.thrift.TException; import org.junit.After; @@ -100,7 +98,6 @@ public class TestReplicationScenarios { private static int msPort; private static Driver driver; private static HiveMetaStoreClient metaStoreClient; - private static String proxySettingName; static HiveConf hconfMirror; static int msPortMirror; static Driver driverMirror; @@ -136,8 +133,6 @@ public class TestReplicationScenarios { hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); hconf.setVar(HiveConf.ConfVars.REPLCMDIR, TEST_PATH + "/cmroot/"); - proxySettingName = "hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts"; - hconf.set(proxySettingName, "*"); msPort = MetaStoreTestUtils.startMetaStore(hconf); hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/"); hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" @@ -3269,40 +3264,6 @@ public class TestReplicationScenarios { assertFalse(new AndFilter(no, no, no).accept(dummyEvent)); } - @Test - public void testAuthForNotificationAPIs() throws Exception { - // Setup - long firstEventId = metaStoreClient.getCurrentNotificationEventId().getEventId(); - String dbName = "testAuthForNotificationAPIs"; - driver.run("create database " + dbName); - NotificationEventResponse rsp = metaStoreClient.getNextNotification(firstEventId, 0, null); - assertEquals(1, rsp.getEventsSize()); - // Test various scenarios - // Remove the proxy privilege and the auth should fail (in reality the proxy setting should not be changed on the fly) - hconf.unset(proxySettingName); - // Need to explicitly update ProxyUsers - ProxyUsers.refreshSuperUserGroupsConfiguration(hconf); - // Verify if the auth should fail - Exception ex = null; - try { - rsp = metaStoreClient.getNextNotification(firstEventId, 0, null); - } catch (TException e) { - ex = e; - } - assertNotNull(ex); - // Disable auth so the call should succeed - hconf.setBoolVar(HiveConf.ConfVars.METASTORE_EVENT_DB_NOTIFICATION_API_AUTH, false); - try { - rsp = metaStoreClient.getNextNotification(firstEventId, 0, null); - assertEquals(1, rsp.getEventsSize()); - } finally { - // Restore the settings - hconf.setBoolVar(HiveConf.ConfVars.METASTORE_EVENT_DB_NOTIFICATION_API_AUTH, true); - hconf.set(proxySettingName, "*"); - ProxyUsers.refreshSuperUserGroupsConfiguration(hconf); - } - } - private NotificationEvent createDummyEvent(String dbname, String tblname, long evid) { MessageFactory msgFactory = MessageFactory.getInstance(); Table t = new Table(); http://git-wip-us.apache.org/repos/asf/hive/blob/6983d595/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index aa2c3bb..50c4a98 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.util.DependencyResolver; -import org.apache.hadoop.hive.shims.Utils; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -64,7 +63,6 @@ public class TestReplicationScenariosAcrossInstances { public static void classLevelSetup() throws Exception { Configuration conf = new Configuration(); conf.set("dfs.client.use.datanode.hostname", "true"); - conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); primary = new WarehouseInstance(LOG, miniDFSCluster); http://git-wip-us.apache.org/repos/asf/hive/blob/6983d595/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 20e7ec1..d5de4f2 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -7055,28 +7055,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { @Override public NotificationEventResponse get_next_notification(NotificationEventRequest rqst) throws TException { - try { - authorizeProxyPrivilege(); - } catch (Exception ex) { - LOG.error("Not authorized to make the get_next_notification call. You can try to disable " + - HiveConf.ConfVars.METASTORE_EVENT_DB_NOTIFICATION_API_AUTH.varname, ex); - throw new TException(ex); - } - RawStore ms = getMS(); return ms.getNextNotification(rqst); } @Override public CurrentNotificationEventId get_current_notificationEventId() throws TException { - try { - authorizeProxyPrivilege(); - } catch (Exception ex) { - LOG.error("Not authorized to make the get_current_notificationEventId call. You can try to disable " + - HiveConf.ConfVars.METASTORE_EVENT_DB_NOTIFICATION_API_AUTH.varname, ex); - throw new TException(ex); - } - RawStore ms = getMS(); return ms.getCurrentNotificationEventId(); } @@ -7084,35 +7068,10 @@ public class HiveMetaStore extends ThriftHiveMetastore { @Override public NotificationEventsCountResponse get_notification_events_count(NotificationEventsCountRequest rqst) throws TException { - try { - authorizeProxyPrivilege(); - } catch (Exception ex) { - LOG.error("Not authorized to make the get_notification_events_count call. You can try to disable " + - HiveConf.ConfVars.METASTORE_EVENT_DB_NOTIFICATION_API_AUTH.varname, ex); - throw new TException(ex); - } - RawStore ms = getMS(); return ms.getNotificationEventsCount(rqst); } - private void authorizeProxyPrivilege() throws Exception { - // Skip the auth in embedded mode or if the auth is disabled - if (!isMetaStoreRemote() || !hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_EVENT_DB_NOTIFICATION_API_AUTH)) { - return; - } - String user = null; - try { - user = Utils.getUGI().getShortUserName(); - } catch (Exception ex) { - LOG.error("Cannot obtain username", ex); - throw ex; - } - if (!MetaStoreUtils.checkUserHasHostProxyPrivileges(user, hiveConf, getIPAddress())) { - throw new MetaException("User " + user + " is not allowed to perform this API call"); - } - } - @Override public FireEventResponse fire_listener_event(FireEventRequest rqst) throws TException { switch (rqst.getData().getSetField()) { http://git-wip-us.apache.org/repos/asf/hive/blob/6983d595/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java index a147a25..b51446d 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java @@ -31,7 +31,6 @@ import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -107,9 +106,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.security.SaslRpcServer; -import org.apache.hadoop.security.authorize.DefaultImpersonationProvider; -import org.apache.hadoop.security.authorize.ProxyUsers; -import org.apache.hadoop.util.MachineList; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.ReflectionUtil; @@ -1978,26 +1974,4 @@ public class MetaStoreUtils { public static double decimalToDouble(Decimal decimal) { return new BigDecimal(new BigInteger(decimal.getUnscaled()), decimal.getScale()).doubleValue(); } - - /** - * Verify if the user is allowed to make DB notification related calls. - * Only the superusers defined in the Hadoop proxy user settings have the permission. - * - * @param user the short user name - * @param config that contains the proxy user settings - * @return if the user has the permission - */ - public static boolean checkUserHasHostProxyPrivileges(String user, Configuration conf, String ipAddress) { - DefaultImpersonationProvider sip = ProxyUsers.getDefaultImpersonationProvider(); - // Just need to initialize the ProxyUsers for the first time, given that the conf will not change on the fly - if (sip == null) { - ProxyUsers.refreshSuperUserGroupsConfiguration(conf); - sip = ProxyUsers.getDefaultImpersonationProvider(); - } - Map<String, Collection<String>> proxyHosts = sip.getProxyHosts(); - Collection<String> hostEntries = proxyHosts.get(sip.getProxySuperuserIpConfKey(user)); - MachineList machineList = new MachineList(hostEntries); - ipAddress = (ipAddress == null) ? StringUtils.EMPTY : ipAddress; - return machineList.includes(ipAddress); - } } http://git-wip-us.apache.org/repos/asf/hive/blob/6983d595/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 04ab904..54746d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -35,7 +35,6 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.calcite.util.Pair; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -77,10 +76,11 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { // be a CM uri in the from path. if (ReplChangeManager.isCMFileUri(fromPath, srcFs)) { String[] result = ReplChangeManager.getFileWithChksumFromURI(fromPath.toString()); - ReplChangeManager.FileInfo sourceInfo = ReplChangeManager - .getFileInfo(new Path(result[0]), result[1], conf); + Path sourcePath = ReplChangeManager + .getFileStatus(new Path(result[0]), result[1], conf) + .getPath(); if (FileUtils.copy( - sourceInfo.getFs(), sourceInfo.getSourcePath(), + sourcePath.getFileSystem(conf), sourcePath, dstFs, toPath, false, false, conf)) { return 0; } else { @@ -90,13 +90,13 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { } } - List<ReplChangeManager.FileInfo> srcFiles = new ArrayList<>(); + List<Path> srcPaths = new ArrayList<>(); if (rwork.readSrcAsFilesList()) { // This flow is usually taken for REPL LOAD // Our input is the result of a _files listing, we should expand out _files. - srcFiles = filesInFileListing(srcFs, fromPath); - LOG.debug("ReplCopyTask _files contains:" + (srcFiles == null ? "null" : srcFiles.size())); - if ((srcFiles == null) || (srcFiles.isEmpty())) { + srcPaths = filesInFileListing(srcFs, fromPath); + LOG.debug("ReplCopyTask _files contains:" + (srcPaths == null ? "null" : srcPaths.size())); + if ((srcPaths == null) || (srcPaths.isEmpty())) { if (work.isErrorOnSrcEmpty()) { console.printError("No _files entry found on source: " + fromPath.toString()); return 5; @@ -120,18 +120,17 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { for (FileStatus oneSrc : srcs) { console.printInfo("Copying file: " + oneSrc.getPath().toString()); LOG.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath); - srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf), - oneSrc.getPath(), null, null, true)); + srcPaths.add(oneSrc.getPath()); } } - LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size()); + LOG.debug("ReplCopyTask numFiles: {}", srcPaths.size()); if (!FileUtils.mkdir(dstFs, toPath, conf)) { console.printError("Cannot make target directory: " + toPath.toString()); return 2; } // Copy the files from different source file systems to one destination directory - new CopyUtils(rwork.distCpDoAsUser(), conf).copyAndVerify(toPath, srcFiles); + new CopyUtils(rwork.distCpDoAsUser(), conf).doCopy(toPath, srcPaths); return 0; } catch (Exception e) { @@ -141,7 +140,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { } } - private List<ReplChangeManager.FileInfo> filesInFileListing(FileSystem fs, Path dataPath) + private List<Path> filesInFileListing(FileSystem fs, Path dataPath) throws IOException { Path fileListing = new Path(dataPath, EximUtil.FILES_NAME); LOG.debug("ReplCopyTask filesInFileListing() reading " + fileListing.toUri()); @@ -151,7 +150,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { // On success, but with nothing to return, we can return an empty list. } - List<ReplChangeManager.FileInfo> filePaths = new ArrayList<>(); + List<Path> filePaths = new ArrayList<>(); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(fileListing))); // TODO : verify if skipping charset here is okay @@ -161,8 +160,9 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { String[] fileWithChksum = ReplChangeManager.getFileWithChksumFromURI(line); try { - ReplChangeManager.FileInfo f = ReplChangeManager - .getFileInfo(new Path(fileWithChksum[0]), fileWithChksum[1], conf); + Path f = ReplChangeManager + .getFileStatus(new Path(fileWithChksum[0]), fileWithChksum[1], conf) + .getPath(); filePaths.add(f); } catch (MetaException e) { // issue warning for missing file and throw exception http://git-wip-us.apache.org/repos/asf/hive/blob/6983d595/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 3d99499..a022b5d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -46,7 +46,6 @@ public class CopyUtils { private final long maxNumberOfFiles; private final boolean hiveInTest; private final String copyAsUser; - private final int MAX_COPY_RETRY = 3; public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) { this.hiveConf = hiveConf; @@ -89,11 +88,12 @@ public class CopyUtils { 3. aggregate fileSize of all source Paths(can be directory / file) is less than configured size. 4. number of files of all source Paths(can be directory / file) is less than configured size. */ - private boolean regularCopy(FileSystem destinationFs, FileSystem sourceFs, List<ReplChangeManager.FileInfo> fileList) + private boolean regularCopy(FileSystem destinationFs, Map.Entry<FileSystem, List<Path>> entry) throws IOException { if (hiveInTest) { return true; } + FileSystem sourceFs = entry.getKey(); if (isLocal(sourceFs) || isLocal(destinationFs)) { return true; } @@ -104,17 +104,8 @@ public class CopyUtils { long size = 0; long numberOfFiles = 0; - for (ReplChangeManager.FileInfo fileInfo : fileList) { - ContentSummary contentSummary = null; - try { - contentSummary = sourceFs.getContentSummary(fileInfo.getEffectivePath()); - } catch (FileNotFoundException e) { - // in replication, if source file does not exist, try cmroot - if (fileInfo.isUseSourcePath() && fileInfo.getCmPath() != null) { - contentSummary = sourceFs.getContentSummary(fileInfo.getCmPath()); - fileInfo.setIsUseSourcePath(false); - } - } + for (Path path : entry.getValue()) { + ContentSummary contentSummary = sourceFs.getContentSummary(path); size += contentSummary.getLength(); numberOfFiles += contentSummary.getFileCount(); if (limitReachedForLocalCopy(size, numberOfFiles)) { @@ -138,28 +129,15 @@ public class CopyUtils { return fs.getScheme().equals("file"); } - private Map<FileSystem, List<Path>> fsToPathMap(List<Path> srcPaths) throws IOException { + private Map<FileSystem, List<Path>> fsToFileMap(List<Path> srcPaths) throws IOException { Map<FileSystem, List<Path>> result = new HashMap<>(); for (Path path : srcPaths) { FileSystem fileSystem = path.getFileSystem(hiveConf); if (!result.containsKey(fileSystem)) { - result.put(fileSystem, new ArrayList<Path>()); + result.put(fileSystem, new ArrayList<>()); } result.get(fileSystem).add(path); } return result; } - - private Map<FileSystem, List<ReplChangeManager.FileInfo>> fsToFileMap( - List<ReplChangeManager.FileInfo> srcFiles) throws IOException { - Map<FileSystem, List<ReplChangeManager.FileInfo>> result = new HashMap<>(); - for (ReplChangeManager.FileInfo file : srcFiles) { - FileSystem fileSystem = file.getFs(); - if (!result.containsKey(fileSystem)) { - result.put(fileSystem, new ArrayList<ReplChangeManager.FileInfo>()); - } - result.get(fileSystem).add(file); - } - return result; - } } http://git-wip-us.apache.org/repos/asf/hive/blob/6983d595/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index b4c8c08..dd9296a 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -63,46 +63,6 @@ public class ReplChangeManager { COPY } - public static class FileInfo { - FileSystem fs; - Path sourcePath; - Path cmPath; - String checkSum; - boolean useSourcePath; - public FileInfo(FileSystem fs, Path sourcePath, Path cmPath, String checkSum, boolean useSourcePath) { - this.fs = fs; - this.sourcePath = sourcePath; - this.cmPath = cmPath; - this.checkSum = checkSum; - this.useSourcePath = useSourcePath; - } - public FileSystem getFs() { - return fs; - } - public Path getSourcePath() { - return sourcePath; - } - public Path getCmPath() { - return cmPath; - } - public String getCheckSum() { - return checkSum; - } - public boolean isUseSourcePath() { - return useSourcePath; - } - public void setIsUseSourcePath(boolean useSourcePath) { - this.useSourcePath = useSourcePath; - } - public Path getEffectivePath() { - if (useSourcePath) { - return sourcePath; - } else { - return cmPath; - } - } - } - public static ReplChangeManager getInstance(Configuration conf) throws MetaException { if (instance == null) { instance = new ReplChangeManager(conf); @@ -299,25 +259,25 @@ public class ReplChangeManager { * @param src Original file location * @param checksumString Checksum of the original file * @param conf - * @return Corresponding FileInfo object + * @return Corresponding FileStatus object */ - static public FileInfo getFileInfo(Path src, String checksumString, + static public FileStatus getFileStatus(Path src, String checksumString, Configuration conf) throws MetaException { try { FileSystem srcFs = src.getFileSystem(conf); if (checksumString == null) { - return new FileInfo(srcFs, src, null, null, true); + return srcFs.getFileStatus(src); } if (!srcFs.exists(src)) { - return new FileInfo(srcFs, src, getCMPath(conf, src.getName(), checksumString), checksumString, false); + return srcFs.getFileStatus(getCMPath(conf, src.getName(), checksumString)); } String currentChecksumString = checksumFor(src, srcFs); if (currentChecksumString == null || checksumString.equals(currentChecksumString)) { - return new FileInfo(srcFs, src, getCMPath(conf, src.getName(), checksumString), checksumString, true); + return srcFs.getFileStatus(src); } else { - return new FileInfo(srcFs, src, getCMPath(conf, src.getName(), checksumString), checksumString, false); + return srcFs.getFileStatus(getCMPath(conf, src.getName(), checksumString)); } } catch (IOException e) { throw new MetaException(StringUtils.stringifyException(e));