Repository: hive Updated Branches: refs/heads/master eb21132bf -> 87e8c738b
HIVE-18117: Create TestCliDriver for HDFS EC (Andrew Sherman, reviewed by Sahil Takiar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/87e8c738 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/87e8c738 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/87e8c738 Branch: refs/heads/master Commit: 87e8c738beab1ef2899681a13dec52acc6bd05f7 Parents: eb21132 Author: Andrew Sherman <asher...@cloudera.com> Authored: Wed May 23 10:52:19 2018 -0500 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Fri May 25 12:33:46 2018 -0700 ---------------------------------------------------------------------- .../cli/TestErasureCodingHDFSCliDriver.java | 65 ++++ .../test/resources/testconfiguration.properties | 6 + .../hadoop/hive/cli/control/CliConfigs.java | 50 +++ .../org/apache/hadoop/hive/ql/QTestUtil.java | 60 ++- .../ql/processors/CommandProcessorFactory.java | 65 ++-- .../hive/ql/processors/ErasureProcessor.java | 383 +++++++++++++++++++ .../hadoop/hive/ql/processors/HiveCommand.java | 1 + .../hadoop/hive/ql/session/SessionState.java | 5 + .../processors/TestCommandProcessorFactory.java | 2 +- .../queries/clientpositive/erasure_commands.q | 10 + .../queries/clientpositive/erasure_simple.q | 51 +++ .../clientpositive/erasure_commands.q.out | 4 + .../erasurecoding/erasure_commands.q.out | 8 + .../erasurecoding/erasure_simple.q.out | 111 ++++++ .../apache/hadoop/hive/shims/Hadoop23Shims.java | 164 ++++++++ .../apache/hadoop/hive/shims/HadoopShims.java | 107 ++++++ .../ptest2/conf/deployed/master-mr2.properties | 7 + 17 files changed, 1059 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestErasureCodingHDFSCliDriver.java ---------------------------------------------------------------------- diff --git a/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestErasureCodingHDFSCliDriver.java b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestErasureCodingHDFSCliDriver.java new file mode 100644 index 0000000..b459ffc --- /dev/null +++ b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestErasureCodingHDFSCliDriver.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cli; + +import java.io.File; +import java.util.List; + +import org.apache.hadoop.hive.cli.control.CliAdapter; +import org.apache.hadoop.hive.cli.control.CliConfigs; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * A Test Driver that can be used to run tests over hdfs directories that employ Erasure Coding. + */ +@RunWith(Parameterized.class) +public class TestErasureCodingHDFSCliDriver { + + static CliAdapter adapter = new CliConfigs.ErasureCodingHDFSCliConfig().getCliAdapter(); + + @Parameters(name = "{0}") + public static List<Object[]> getParameters() throws Exception { + return adapter.getParameters(); + } + + // fields annotated with ClassRule must be public + @ClassRule + public static TestRule cliClassRule = adapter.buildClassRule(); + + @Rule + public TestRule cliTestRule = adapter.buildTestRule(); + + private String name; + private File qfile; + + public TestErasureCodingHDFSCliDriver(String name, File qfile) { + this.name = name; + this.qfile = qfile; + } + + @Test + public void testCliDriver() throws Exception { + adapter.runTest(name, qfile); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index d146f92..f7def35 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1693,3 +1693,9 @@ druid.query.files=druidmini_test1.q,\ druidmini_floorTime.q druid.kafka.query.files=druidkafkamini_basic.q + +# tests to be run by TestErasureCodingHDFSCliDriver and TestCliDriver +erasurecoding.shared.query.files=erasure_commands.q + +# tests to be run only by TestErasureCodingHDFSCliDriver +erasurecoding.only.query.files=erasure_simple.q http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java index 7063adb..fddd40f 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java @@ -57,6 +57,7 @@ public class CliConfigs { excludesFrom(testConfigProps, "localSpark.only.query.files"); excludesFrom(testConfigProps, "druid.query.files"); excludesFrom(testConfigProps, "druid.kafka.query.files"); + excludesFrom(testConfigProps, "erasurecoding.only.query.files"); excludeQuery("fouter_join_ppr.q"); // Disabled in HIVE-19509 @@ -305,6 +306,7 @@ public class CliConfigs { excludesFrom(testConfigProps, "minimr.query.files"); excludesFrom(testConfigProps, "minitez.query.files"); excludesFrom(testConfigProps, "encrypted.query.files"); + excludesFrom(testConfigProps, "erasurecoding.only.query.files"); setResultsDir("ql/src/test/results/clientpositive/perf/tez"); setLogDir("itests/qtest/target/qfile-results/clientpositive/tez"); @@ -660,4 +662,52 @@ public class CliConfigs { } } } + + /** + * Configuration for TestErasureCodingHDFSCliDriver. + */ + public static class ErasureCodingHDFSCliConfig extends AbstractCliConfig { + public ErasureCodingHDFSCliConfig() { + super(CoreCliDriver.class); + try { + setQueryDir("ql/src/test/queries/clientpositive"); + + includesFrom(testConfigProps, "erasurecoding.shared.query.files"); + includesFrom(testConfigProps, "erasurecoding.only.query.files"); + + setResultsDir("ql/src/test/results/clientpositive/erasurecoding"); + setLogDir("itests/qtest/target/qfile-results/clientpositive"); + + setInitScript("q_test_init_src.sql"); + setCleanupScript("q_test_cleanup_src.sql"); + + setClusterType(MiniClusterType.mr); + setFsType(QTestUtil.FsType.erasure_coded_hdfs); + setHiveConfDir(getClusterType()); + } catch (Exception e) { + throw new RuntimeException("can't construct cliconfig", e); + } + } + + /** + * Set the appropriate conf dir based on the cluster type. + */ + private void setHiveConfDir(MiniClusterType clusterType) { + switch (clusterType) { + case tez: + setHiveConfDir("data/conf/tez"); + break; + case spark: + setHiveConfDir("data/conf/spark/standalone"); + break; + case miniSparkOnYarn: + setHiveConfDir("data/conf/spark/yarn-cluster"); + break; + default: + setHiveConfDir("data/conf"); + break; + } + } + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 57be0f0..2365fb7 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -109,6 +109,7 @@ import org.apache.hadoop.hive.ql.dataset.DatasetCollection; import org.apache.hadoop.hive.ql.dataset.DatasetParser; import org.apache.hadoop.hive.ql.dataset.Dataset; import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsErasureCodingShim; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.StreamPrinter; import org.apache.hive.druid.MiniDruidCluster; @@ -143,7 +144,7 @@ public class QTestUtil { static final Logger LOG = LoggerFactory.getLogger("QTestUtil"); private final static String defaultInitScript = "q_test_init.sql"; private final static String defaultCleanupScript = "q_test_cleanup.sql"; - private final String[] testOnlyCommands = new String[]{"crypto"}; + private final String[] testOnlyCommands = new String[]{"crypto", "erasure"}; public static final String TEST_TMP_DIR_PROPERTY = "test.tmp.dir"; // typically target/tmp private static final String BUILD_DIR_PROPERTY = "build.dir"; // typically target @@ -151,6 +152,11 @@ public class QTestUtil { public static final String TEST_SRC_TABLES_PROPERTY = "test.src.tables"; public static final String TEST_HIVE_USER_PROPERTY = "test.hive.user"; + /** + * The Erasure Coding Policy to use in TestErasureCodingHDFSCliDriver. + */ + private static final String DEFAULT_TEST_EC_POLICY = "RS-3-2-1024k"; + private String testWarehouse; private final String testFiles; private final File datasetDir; @@ -479,6 +485,7 @@ public class QTestUtil { local, hdfs, encrypted_hdfs, + erasure_coded_hdfs, } public enum MiniClusterType { @@ -650,25 +657,47 @@ public class QTestUtil { if (fsType == FsType.local) { fs = FileSystem.getLocal(conf); - } else if (fsType == FsType.hdfs || fsType == FsType.encrypted_hdfs) { + } else if (fsType == FsType.hdfs || fsType == FsType.encrypted_hdfs|| fsType == FsType.erasure_coded_hdfs) { int numDataNodes = 4; - if (fsType == FsType.encrypted_hdfs) { + // Setup before getting dfs + switch (fsType) { + case encrypted_hdfs: // Set the security key provider so that the MiniDFS cluster is initialized // with encryption conf.set(SECURITY_KEY_PROVIDER_URI_NAME, getKeyProviderURI()); conf.setInt("fs.trash.interval", 50); + break; + case erasure_coded_hdfs: + // We need more NameNodes for EC. + // To fully exercise hdfs code paths we need 5 NameNodes for the RS-3-2-1024k policy. + // With 6 NameNodes we can also run the RS-6-3-1024k policy. + numDataNodes = 6; + break; + default: + break; + } - dfs = shims.getMiniDfs(conf, numDataNodes, true, null); - fs = dfs.getFileSystem(); + dfs = shims.getMiniDfs(conf, numDataNodes, true, null); + fs = dfs.getFileSystem(); + // Setup after getting dfs + switch (fsType) { + case encrypted_hdfs: // set up the java key provider for encrypted hdfs cluster hes = shims.createHdfsEncryptionShim(fs, conf); - LOG.info("key provider is initialized"); - } else { - dfs = shims.getMiniDfs(conf, numDataNodes, true, null); - fs = dfs.getFileSystem(); + break; + case erasure_coded_hdfs: + // The Erasure policy can't be set in a q_test_init script as QTestUtil runs that code in + // a mode that disallows test-only CommandProcessors. + // Set the default policy on the root of the file system here. + HdfsErasureCodingShim erasureCodingShim = shims.createHdfsErasureCodingShim(fs, conf); + erasureCodingShim.enableErasureCodingPolicy(DEFAULT_TEST_EC_POLICY); + erasureCodingShim.setErasureCodingPolicy(new Path("hdfs:///"), DEFAULT_TEST_EC_POLICY); + break; + default: + break; } } else { throw new IllegalArgumentException("Unknown or unhandled fsType [" + fsType + "]"); @@ -1030,7 +1059,7 @@ public class QTestUtil { LOG.warn("Trying to drop table " + e.getTableName() + ". But it does not exist."); continue; } - db.dropTable(dbName, tblName, true, true, fsType == FsType.encrypted_hdfs); + db.dropTable(dbName, tblName, true, true, fsNeedsPurge(fsType)); } } if (!DEFAULT_DATABASE_NAME.equals(dbName)) { @@ -2297,4 +2326,15 @@ public class QTestUtil { public static void initEventNotificationPoll() throws Exception { NotificationEventPoll.initialize(SessionState.get().getConf()); } + + /** + * Should deleted test tables have their data purged. + * @return true if data should be purged + */ + private static boolean fsNeedsPurge(FsType type) { + if (type == FsType.encrypted_hdfs || type == FsType.erasure_coded_hdfs) { + return true; + } + return false; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java index 3d47991..f8b6a97 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.processors; import static org.apache.commons.lang.StringUtils.isBlank; +import java.io.IOException; import java.sql.SQLException; import java.util.HashSet; import java.util.Set; @@ -72,35 +73,41 @@ public final class CommandProcessorFactory { return null; } switch (hiveCommand) { - case SET: - return new SetProcessor(); - case RESET: - return new ResetProcessor(); - case DFS: - SessionState ss = SessionState.get(); - return new DfsProcessor(ss.getConf()); - case ADD: - return new AddResourceProcessor(); - case LIST: - return new ListResourceProcessor(); - case LLAP_CLUSTER: - return new LlapClusterResourceProcessor(); - case LLAP_CACHE: - return new LlapCacheResourceProcessor(); - case DELETE: - return new DeleteResourceProcessor(); - case COMPILE: - return new CompileProcessor(); - case RELOAD: - return new ReloadProcessor(); - case CRYPTO: - try { - return new CryptoProcessor(SessionState.get().getHdfsEncryptionShim(), conf); - } catch (HiveException e) { - throw new SQLException("Fail to start the command processor due to the exception: ", e); - } - default: - throw new AssertionError("Unknown HiveCommand " + hiveCommand); + case SET: + return new SetProcessor(); + case RESET: + return new ResetProcessor(); + case DFS: + SessionState ss = SessionState.get(); + return new DfsProcessor(ss.getConf()); + case ADD: + return new AddResourceProcessor(); + case LIST: + return new ListResourceProcessor(); + case LLAP_CLUSTER: + return new LlapClusterResourceProcessor(); + case LLAP_CACHE: + return new LlapCacheResourceProcessor(); + case DELETE: + return new DeleteResourceProcessor(); + case COMPILE: + return new CompileProcessor(); + case RELOAD: + return new ReloadProcessor(); + case CRYPTO: + try { + return new CryptoProcessor(SessionState.get().getHdfsEncryptionShim(), conf); + } catch (HiveException e) { + throw new SQLException("Fail to start the command processor due to the exception: ", e); + } + case ERASURE: + try { + return new ErasureProcessor(conf); + } catch (IOException e) { + throw new SQLException("Fail to start the erasure command processor due to the exception: ", e); + } + default: + throw new AssertionError("Unknown HiveCommand " + hiveCommand); } } http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java new file mode 100644 index 0000000..46114f5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.processors; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +import com.google.common.base.Joiner; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileErasureCodingPolicy; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class processes hadoop commands used for Erasure Coding. + * It is meant to be run only by Hive unit tests. + * + * The âErasureâ commands implemented by this class allow test writers to use Erasure Coding in Hive. + * Hdfs determines whether to use Erasure Coding for a file based on the presence of an Erasure + * Coding Policy on the directory which contains the file. + * These âErasureâ commands can be used to manipulate Erasure Coding Policies. + * These commands are similar to the user level commands provided by the âhdfs ecâ command as + * documented at: + * https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HDFSErasureCoding.html + * + * <ul> + * <li>getPolicy: Get an erasure coding policy for a Path. + * <li>enablePolicy: Enable an erasure coding policy. + * <li>removePolicy: Remove an erasure coding policy. + * <li>disablePolicy: Disable an erasure coding policy. + * <li>setPolicy: Sets an erasure coding policy on a directory at the specified path + * <li>unsetPolicy: Unsets an erasure coding policy on a directory at the specified path + * <li>echo: Echo the parameters given to the command (not an ec command) + * </ul> + */ +public class ErasureProcessor implements CommandProcessor { + private static final Logger LOG = LoggerFactory.getLogger(ErasureProcessor.class.getName()); + + private HadoopShims.HdfsErasureCodingShim erasureCodingShim; + + ErasureProcessor(HiveConf config) throws IOException { + HadoopShims hadoopShims = ShimLoader.getHadoopShims(); + FileSystem fileSystem = FileSystem.get(config); + this.erasureCodingShim = hadoopShims.createHdfsErasureCodingShim(fileSystem, config); + } + + private CommandLine parseCommandArgs(final Options opts, String[] args) throws ParseException { + CommandLineParser parser = new GnuParser(); + return parser.parse(opts, args); + } + + private CommandProcessorResponse returnErrorResponse(final String errmsg) { + return new CommandProcessorResponse(1, "Erasure Processor Helper Failed: " + errmsg, null); + } + + private void writeTestOutput(final String msg) { + SessionState.get().out.println(msg); + } + + @Override + public CommandProcessorResponse run(String command) { + String[] args = command.split("\\s+"); + + if (args.length < 1) { + return returnErrorResponse("Command arguments are empty."); + } + + if (erasureCodingShim == null) { + return returnErrorResponse("Hadoop erasure shim is not initialized."); + } + + String action = args[0].toLowerCase(); + String[] params = Arrays.copyOfRange(args, 1, args.length); + + try { + switch (action) { + // note we switch on the lowercase command name + case "disablepolicy": + disablePolicy(params); + break; + case "echo": + echo(params); + break; + case "enablepolicy": + enablePolicy(params); + break; + case "getpolicy": + getPolicy(params); + break; + case "listpolicies": + listPolicies(); + break; + case "setpolicy": + setPolicy(params); + break; + case "removepolicy": + removePolicy(params); + break; + case "unsetpolicy": + unsetPolicy(params); + break; + default: + return returnErrorResponse("Unknown erasure command action: " + action); + } + } catch (Exception e) { + return returnErrorResponse(e.getMessage()); + } + + return new CommandProcessorResponse(0); + } + + /** + * Get an erasure coding policy for a Path. + * @param params Parameters passed to the command. + * @throws Exception if command failed. + */ + private void getPolicy(String[] params) throws Exception { + String command = "getPolicy"; + try { + // getPolicy -path <path> + Options getPolicyOptions = new Options(); + + String pathOptionName = "path"; + Option policyOption = OptionBuilder.hasArg() + .isRequired() + .withLongOpt(pathOptionName) + .withDescription("Path for which Policy should be fetched") + .create(); + getPolicyOptions.addOption(policyOption); + + CommandLine args = parseCommandArgs(getPolicyOptions, params); + String path = args.getOptionValue(pathOptionName); + + HdfsFileErasureCodingPolicy policy = erasureCodingShim.getErasureCodingPolicy(new Path(path)); + writeTestOutput("EC policy is '" + (policy != null ? policy.getName() : "REPLICATED") + "'"); + + } catch (ParseException pe) { + writeTestOutput("Error parsing options for " + command + " " + pe.getMessage()); + } catch (Exception e) { + writeTestOutput("Caught exception running " + command + ": " + e.getMessage()); + throw new Exception("Cannot run " + command + ": " + e.getMessage(), e); + } + } + + /** + * Echo the parameters given to the command. + * @param params parameters which will be echoed + */ + private void echo(String[] params) throws Exception { + String command = "echo"; + try { + writeTestOutput("ECHO " + Joiner.on(" ").join(params)); + } catch (Exception e) { + writeTestOutput("Caught exception running " + command + ": " + e.getMessage()); + throw new Exception("Cannot run " + command + ": " + e.getMessage()); + } + } + + /** + * Enable an erasure coding policy. + * @param params Parameters passed to the command. + * @throws Exception If command failed. + */ + private void enablePolicy(String[] params) throws Exception { + String command = "enablePolicy"; + try { + // enablePolicy -policy <policyName> + Options enablePolicyOptions = new Options(); + + String policyOptionName = "policy"; + Option policyOption = OptionBuilder.hasArg() + .isRequired() + .withLongOpt(policyOptionName) + .withDescription("Policy to enable") + .hasArg() + .create(); + enablePolicyOptions.addOption(policyOption); + + CommandLine args = parseCommandArgs(enablePolicyOptions, params); + String policyName = args.getOptionValue(policyOptionName); + + erasureCodingShim.enableErasureCodingPolicy(policyName); + writeTestOutput("Enabled EC policy '" + policyName + "'"); + } catch (ParseException pe) { + writeTestOutput("Error parsing options for " + command + " " + pe.getMessage()); + } catch (Exception e) { + writeTestOutput("Caught exception running " + command + ": " + e.getMessage()); + throw new Exception("Cannot run " + command + ": " + e.getMessage()); + } + } + + /** + * Remove an erasure coding policy. + * @param params Parameters passed to the command. + * @throws Exception if command failed. + */ + private void removePolicy(String[] params) throws Exception { + String command = "removePolicy"; + try { + // removePolicy -policy <policyName> + Options removePolicyOptions = new Options(); + + String policyOptionName = "policy"; + Option policyOption = OptionBuilder.hasArg() + .isRequired() + .withLongOpt(policyOptionName) + .withDescription("Policy to remove") + .create(); + removePolicyOptions.addOption(policyOption); + + CommandLine args = parseCommandArgs(removePolicyOptions, params); + String policyName = args.getOptionValue(policyOptionName); + + erasureCodingShim.removeErasureCodingPolicy(policyName); + writeTestOutput("Removed EC policy '" + policyName + "'"); + } catch (ParseException pe) { + writeTestOutput("Error parsing options for " + command + " " + pe.getMessage()); + } catch (Exception e) { + writeTestOutput("Caught exception running " + command + ": " + e.getMessage()); + throw new Exception("Cannot run " + command + ": " + e.getMessage()); + } + } + + /** + * Disable an erasure coding policy. + * @param params Parameters passed to the command. + * @throws Exception If command failed. + */ + private void disablePolicy(String[] params) throws Exception { + String command = "disablePolicy"; + try { + // disablePolicy -policy <policyName> + Options disablePolicyOptions = new Options(); + + String policyOptionName = "policy"; + Option policyOption = OptionBuilder.hasArg() + .isRequired() + .withLongOpt(policyOptionName) + .withDescription("Policy to disable") + .create(); + disablePolicyOptions.addOption(policyOption); + + CommandLine args = parseCommandArgs(disablePolicyOptions, params); + String policyName = args.getOptionValue(policyOptionName); + + erasureCodingShim.disableErasureCodingPolicy(policyName); + writeTestOutput("Disabled EC policy '" + policyName + "'"); + } catch (ParseException pe) { + writeTestOutput("Error parsing options for " + command + " " + pe.getMessage()); + } catch (Exception e) { + writeTestOutput("Caught exception running " + command + ": " + e.getMessage()); + throw new Exception("Cannot run " + command + ": " + e.getMessage()); + } + } + + /** + * Sets an erasure coding policy on a directory at the specified path. + * @param params Parameters passed to the command. + * @throws Exception If command failed. + */ + private void setPolicy(String[] params) throws Exception { + String command = "setPolicy"; + try { + // setPolicy -path <path> [-policy <policyName>] + Options setPolicyOptions = new Options(); + + String pathOptionName = "path"; + Option pathOption = OptionBuilder.hasArg() + .isRequired() + .withLongOpt(pathOptionName) + .withDescription("Path to set policy on") + .create(); + setPolicyOptions.addOption(pathOption); + + String policyOptionName = "policy"; + Option policyOption = OptionBuilder.hasArg() + .withLongOpt(policyOptionName) + .withDescription("Policy to set") + .create(); + setPolicyOptions.addOption(policyOption); + + CommandLine args = parseCommandArgs(setPolicyOptions, params); + String path = args.getOptionValue(pathOptionName); + String policy = args.getOptionValue(policyOptionName); + + erasureCodingShim.setErasureCodingPolicy(new Path(path), policy); + writeTestOutput("Set EC policy' " + policy); + } catch (ParseException pe) { + writeTestOutput("Error parsing options for " + command + " " + pe.getMessage()); + } catch (Exception e) { + writeTestOutput("Caught exception running " + command + ": " + e.getMessage()); + throw new Exception("Cannot run " + command + ": " + e.getMessage()); + } + } + + /** + * Unsets an erasure coding policy on a directory at the specified path. + * @param params Parameters passed to the command. + * @throws Exception if command failed. + */ + private void unsetPolicy(String[] params) throws Exception { + String command = "unsetPolicy"; + try { + // unsetPolicy -path <path> + Options unsetPolicyOptions = new Options(); + + String pathOptionName = "path"; + Option pathOption = OptionBuilder.hasArg() + .isRequired() + .withLongOpt(pathOptionName) + .withDescription("Path to unset policy on") + .create(); + unsetPolicyOptions.addOption(pathOption); + + CommandLine args = parseCommandArgs(unsetPolicyOptions, params); + String path = args.getOptionValue(pathOptionName); + + erasureCodingShim.unsetErasureCodingPolicy(new Path(path)); + writeTestOutput("Unset EC policy"); + } catch (ParseException pe) { + writeTestOutput("Error parsing options for " + command + " " + pe.getMessage()); + } catch (Exception e) { + writeTestOutput("Caught exception running " + command + ": " + e.getMessage()); + throw new Exception("Cannot run " + command + ": " + e.getMessage()); + } + } + + /** + * Comparator the compares HdfsFileErasureCodingPolicy by name. + */ + private Comparator<HdfsFileErasureCodingPolicy> nameComparator = + Comparator.comparing(HdfsFileErasureCodingPolicy::getName); + + private void listPolicies() throws Exception { + try { + List<HdfsFileErasureCodingPolicy> erasureCodingPolicies = + erasureCodingShim.getAllErasureCodingPolicies(); + erasureCodingPolicies.sort(nameComparator); + if (erasureCodingPolicies.isEmpty()) { + writeTestOutput("No EC Policies present"); + } + for (HdfsFileErasureCodingPolicy policy : erasureCodingPolicies) { + writeTestOutput("Policy: " + policy.getName() + " " + policy.getStatus()); + } + } catch (Exception e) { + throw new Exception("Cannot do language command: " + e.getMessage()); + } + } + + @Override + public void close() throws Exception { + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java index 56c7516..74e3447 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java @@ -30,6 +30,7 @@ public enum HiveCommand { RESET(), DFS(), CRYPTO(true), + ERASURE(true), ADD(), LIST(), LLAP_CLUSTER(), http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 89129f9..6c56212 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -241,6 +241,11 @@ public class SessionState { */ private Map<URI, HadoopShims.HdfsEncryptionShim> hdfsEncryptionShims = Maps.newHashMap(); + /** + * Cache for Erasure Coding shims. + */ + private Map<URI, HadoopShims.HdfsErasureCodingShim> erasureCodingShims; + private final String userName; /** http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/ql/src/test/org/apache/hadoop/hive/ql/processors/TestCommandProcessorFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/processors/TestCommandProcessorFactory.java b/ql/src/test/org/apache/hadoop/hive/ql/processors/TestCommandProcessorFactory.java index de43c28..0c960f2 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/processors/TestCommandProcessorFactory.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/processors/TestCommandProcessorFactory.java @@ -28,7 +28,7 @@ import org.junit.Test; public class TestCommandProcessorFactory { - private final String[] testOnlyCommands = new String[]{"crypto"}; + private final String[] testOnlyCommands = new String[]{"crypto", "erasure"}; private HiveConf conf; http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/ql/src/test/queries/clientpositive/erasure_commands.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/erasure_commands.q b/ql/src/test/queries/clientpositive/erasure_commands.q new file mode 100644 index 0000000..c8e09b8 --- /dev/null +++ b/ql/src/test/queries/clientpositive/erasure_commands.q @@ -0,0 +1,10 @@ +--! qt:dataset:src +-- simple test to show ERASURE commands running on local fs (TestCliDriver) or hdfs (TestErasureCodingHDFSCliDriver). + +ERASURE echo listOfPolicies output is:; +ERASURE listPolicies; + +-- what is the policy on the root of the fs? +ERASURE echo original policy on /; +ERASURE getPolicy --path /; + http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/ql/src/test/queries/clientpositive/erasure_simple.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/erasure_simple.q b/ql/src/test/queries/clientpositive/erasure_simple.q new file mode 100644 index 0000000..c08409c --- /dev/null +++ b/ql/src/test/queries/clientpositive/erasure_simple.q @@ -0,0 +1,51 @@ +--! qt:dataset:src + +-- Test Erasure Coding Policies + +ERASURE echo listPolicies originally was; +ERASURE listPolicies; +ERASURE enablePolicy --policy RS-10-4-1024k; +ERASURE echo listPolicies after enablePolicy; +ERASURE listPolicies; + +dfs ${system:test.dfs.mkdir} hdfs:///tmp/erasure_coding1; + +ERASURE echo original policy on erasure_coding1; +ERASURE getPolicy --path hdfs:///tmp/erasure_coding1; + +ERASURE echo set the default policy on erasure_coding1; +ERASURE setPolicy --path hdfs:///tmp/erasure_coding1 --policy RS-10-4-1024k; + +ERASURE echo new policy on erasure_coding1; +ERASURE getPolicy --path hdfs:///tmp/erasure_coding1; + +ERASURE echo unset the default policy on erasure_coding1; +ERASURE unsetPolicy --path hdfs:///tmp/erasure_coding1; +ERASURE getPolicy --path hdfs:///tmp/erasure_coding1; + +create table erasure_table (a int) location 'hdfs:///tmp/erasure_coding1/location1'; + +insert into erasure_table values(4); + +select * from erasure_table; + +drop table if exists erasure_table2; +create table erasure_table2 like src location 'hdfs:///tmp/erasure_coding1/location2'; +insert overwrite table erasure_table2 +select key, value from src; + +ERASURE echo show table extended like erasure_table2; +show table extended like erasure_table2; + +ERASURE echo SHOW TBLPROPERTIES erasure_table2; +SHOW TBLPROPERTIES erasure_table2; + +ERASURE echo unset the default policy on erasure_coding1; +ERASURE unsetPolicy --path hdfs:///tmp/erasure_coding1; + +dfs -rmr hdfs:///tmp/erasure_coding1; + +ERASURE echo disablePolicy RS-10-4-1024k; +ERASURE disablePolicy --policy RS-10-4-1024k; + + http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/ql/src/test/results/clientpositive/erasure_commands.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/erasure_commands.q.out b/ql/src/test/results/clientpositive/erasure_commands.q.out new file mode 100644 index 0000000..92d2f6a --- /dev/null +++ b/ql/src/test/results/clientpositive/erasure_commands.q.out @@ -0,0 +1,4 @@ +ECHO listOfPolicies output is: +No EC Policies present +ECHO original policy on / +EC policy is 'REPLICATED' http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/ql/src/test/results/clientpositive/erasurecoding/erasure_commands.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/erasurecoding/erasure_commands.q.out b/ql/src/test/results/clientpositive/erasurecoding/erasure_commands.q.out new file mode 100644 index 0000000..e927e05 --- /dev/null +++ b/ql/src/test/results/clientpositive/erasurecoding/erasure_commands.q.out @@ -0,0 +1,8 @@ +ECHO listOfPolicies output is: +Policy: RS-10-4-1024k DISABLED +Policy: RS-3-2-1024k ENABLED +Policy: RS-6-3-1024k ENABLED +Policy: RS-LEGACY-6-3-1024k DISABLED +Policy: XOR-2-1-1024k DISABLED +ECHO original policy on / +EC policy is 'RS-3-2-1024k' http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/ql/src/test/results/clientpositive/erasurecoding/erasure_simple.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/erasurecoding/erasure_simple.q.out b/ql/src/test/results/clientpositive/erasurecoding/erasure_simple.q.out new file mode 100644 index 0000000..01f6015 --- /dev/null +++ b/ql/src/test/results/clientpositive/erasurecoding/erasure_simple.q.out @@ -0,0 +1,111 @@ +ECHO listPolicies originally was +Policy: RS-10-4-1024k DISABLED +Policy: RS-3-2-1024k ENABLED +Policy: RS-6-3-1024k ENABLED +Policy: RS-LEGACY-6-3-1024k DISABLED +Policy: XOR-2-1-1024k DISABLED +Enabled EC policy 'RS-10-4-1024k' +ECHO listPolicies after enablePolicy +Policy: RS-10-4-1024k ENABLED +Policy: RS-3-2-1024k ENABLED +Policy: RS-6-3-1024k ENABLED +Policy: RS-LEGACY-6-3-1024k DISABLED +Policy: XOR-2-1-1024k DISABLED +ECHO original policy on erasure_coding1 +EC policy is 'RS-3-2-1024k' +ECHO set the default policy on erasure_coding1 +Set EC policy' RS-10-4-1024k +ECHO new policy on erasure_coding1 +EC policy is 'RS-10-4-1024k' +ECHO unset the default policy on erasure_coding1 +Unset EC policy +EC policy is 'RS-3-2-1024k' +PREHOOK: query: create table erasure_table (a int) location 'hdfs://### HDFS PATH ###' +PREHOOK: type: CREATETABLE +PREHOOK: Input: hdfs://### HDFS PATH ### +PREHOOK: Output: database:default +PREHOOK: Output: default@erasure_table +POSTHOOK: query: create table erasure_table (a int) location 'hdfs://### HDFS PATH ###' +POSTHOOK: type: CREATETABLE +POSTHOOK: Input: hdfs://### HDFS PATH ### +POSTHOOK: Output: database:default +POSTHOOK: Output: default@erasure_table +PREHOOK: query: insert into erasure_table values(4) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@erasure_table +POSTHOOK: query: insert into erasure_table values(4) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@erasure_table +POSTHOOK: Lineage: erasure_table.a SCRIPT [] +PREHOOK: query: select * from erasure_table +PREHOOK: type: QUERY +PREHOOK: Input: default@erasure_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from erasure_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@erasure_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +4 +PREHOOK: query: drop table if exists erasure_table2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists erasure_table2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table erasure_table2 like src location 'hdfs://### HDFS PATH ###' +PREHOOK: type: CREATETABLE +PREHOOK: Input: hdfs://### HDFS PATH ### +PREHOOK: Output: database:default +PREHOOK: Output: default@erasure_table2 +POSTHOOK: query: create table erasure_table2 like src location 'hdfs://### HDFS PATH ###' +POSTHOOK: type: CREATETABLE +POSTHOOK: Input: hdfs://### HDFS PATH ### +POSTHOOK: Output: database:default +POSTHOOK: Output: default@erasure_table2 +PREHOOK: query: insert overwrite table erasure_table2 +select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@erasure_table2 +POSTHOOK: query: insert overwrite table erasure_table2 +select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@erasure_table2 +POSTHOOK: Lineage: erasure_table2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: erasure_table2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ECHO show table extended like erasure_table2 +PREHOOK: query: show table extended like erasure_table2 +PREHOOK: type: SHOW_TABLESTATUS +POSTHOOK: query: show table extended like erasure_table2 +POSTHOOK: type: SHOW_TABLESTATUS +tableName:erasure_table2 +#### A masked pattern was here #### +location:hdfs://### HDFS PATH ### +inputformat:org.apache.hadoop.mapred.TextInputFormat +outputformat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +columns:struct columns { string key, string value} +partitioned:false +partitionColumns: +totalNumberFiles:1 +totalFileSize:5812 +maxFileSize:5812 +minFileSize:5812 +#### A masked pattern was here #### + +ECHO SHOW TBLPROPERTIES erasure_table2 +PREHOOK: query: SHOW TBLPROPERTIES erasure_table2 +PREHOOK: type: SHOW_TBLPROPERTIES +POSTHOOK: query: SHOW TBLPROPERTIES erasure_table2 +POSTHOOK: type: SHOW_TBLPROPERTIES +COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} +numFiles 1 +numRows 500 +rawDataSize 5312 +totalSize 5812 +#### A masked pattern was here #### +ECHO unset the default policy on erasure_coding1 +Unset EC policy +#### A masked pattern was here #### +ECHO disablePolicy RS-10-4-1024k +Disabled EC policy 'RS-10-4-1024k' http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java ---------------------------------------------------------------------- diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index ec06a88..5a77122 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -65,6 +65,8 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.EncryptionZone; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.io.LongWritable; @@ -1441,4 +1443,166 @@ public class Hadoop23Shims extends HadoopShimsSecure { return set; } + private static Boolean hdfsErasureCodingSupport; + + /** + * @return true if the runtime version of hdfs supports erasure coding + */ + private static synchronized boolean isHdfsErasureCodingSupported() { + if (hdfsErasureCodingSupport == null) { + Method m = null; + + try { + m = HdfsAdmin.class.getMethod("getErasureCodingPolicies"); + } catch (NoSuchMethodException e) { + // This version of Hadoop does not support HdfsAdmin.getErasureCodingPolicies(). + // Hadoop 3.0.0 introduces this new method. + } + hdfsErasureCodingSupport = (m != null); + } + + return hdfsErasureCodingSupport; + } + + /** + * Returns a new instance of the HdfsErasureCoding shim. + * + * @param fs a FileSystem object + * @param conf a Configuration object + * @return a new instance of the HdfsErasureCoding shim. + * @throws IOException If an error occurred while creating the instance. + */ + @Override + public HadoopShims.HdfsErasureCodingShim createHdfsErasureCodingShim(FileSystem fs, + Configuration conf) throws IOException { + if (isHdfsErasureCodingSupported()) { + URI uri = fs.getUri(); + if ("hdfs".equals(uri.getScheme())) { + return new HdfsErasureCodingShim(uri, conf); + } + } + return new HadoopShims.NoopHdfsErasureCodingShim(); + } + + /** + * Information about an Erasure Coding Policy. + */ + private static class HdfsFileErasureCodingPolicyImpl implements HdfsFileErasureCodingPolicy { + private final String name; + private final String status; + + HdfsFileErasureCodingPolicyImpl(String name, String status) { + this.name = name; + this.status = status; + } + + HdfsFileErasureCodingPolicyImpl(String name) { + this(name, null); + } + + @Override + public String getName() { + return name; + } + + @Override + public String getStatus() { + return status; + } + } + + /** + * This class encapsulates methods used to get Erasure Coding information from + * HDFS paths in order to to provide commands similar to those provided by the hdfs ec command. + * https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HDFSErasureCoding.html + */ + public static class HdfsErasureCodingShim implements HadoopShims.HdfsErasureCodingShim { + /** + * Gets information about HDFS encryption zones. + */ + private HdfsAdmin hdfsAdmin = null; + + private final Configuration conf; + + HdfsErasureCodingShim(URI uri, Configuration conf) throws IOException { + this.conf = conf; + this.hdfsAdmin = new HdfsAdmin(uri, conf); + } + + /** + * Lists all (enabled, disabled and removed) erasure coding policies registered in HDFS. + * @return a list of erasure coding policies + */ + @Override + public List<HdfsFileErasureCodingPolicy> getAllErasureCodingPolicies() throws IOException { + ErasureCodingPolicyInfo[] erasureCodingPolicies = hdfsAdmin.getErasureCodingPolicies(); + List<HdfsFileErasureCodingPolicy> policies = new ArrayList<>(erasureCodingPolicies.length); + for (ErasureCodingPolicyInfo erasureCodingPolicy : erasureCodingPolicies) { + policies.add(new HdfsFileErasureCodingPolicyImpl(erasureCodingPolicy.getPolicy().getName(), + erasureCodingPolicy.getState().toString())); + } + return policies; + } + + + /** + * Enable an erasure coding policy. + * @param ecPolicyName the name of the erasure coding policy + */ + @Override + public void enableErasureCodingPolicy(String ecPolicyName) throws IOException { + hdfsAdmin.enableErasureCodingPolicy(ecPolicyName); + } + + /** + * Sets an erasure coding policy on a directory at the specified path. + * @param path a directory in HDFS + * @param ecPolicyName the name of the erasure coding policy + */ + @Override + public void setErasureCodingPolicy(Path path, String ecPolicyName) throws IOException { + hdfsAdmin.setErasureCodingPolicy(path, ecPolicyName); + } + + /** + * Get details of the erasure coding policy of a file or directory at the specified path. + * @param path an hdfs file or directory + * @return an erasure coding policy + */ + @Override + public HdfsFileErasureCodingPolicy getErasureCodingPolicy(Path path) throws IOException { + ErasureCodingPolicy erasureCodingPolicy = hdfsAdmin.getErasureCodingPolicy(path); + if (erasureCodingPolicy == null) { + return null; + } + return new HdfsFileErasureCodingPolicyImpl(erasureCodingPolicy.getName()); + } + + /** + * Unset an erasure coding policy set by a previous call to setPolicy on a directory. + * @param path a directory in HDFS + */ + @Override + public void unsetErasureCodingPolicy(Path path) throws IOException { + hdfsAdmin.unsetErasureCodingPolicy(path); + } + + /** + * Remove an erasure coding policy. + * @param ecPolicyName the name of the erasure coding policy + */ + @Override + public void removeErasureCodingPolicy(String ecPolicyName) throws IOException { + hdfsAdmin.removeErasureCodingPolicy(ecPolicyName); + } + + /** + * Disable an erasure coding policy. + * @param ecPolicyName the name of the erasure coding policy + */ + @Override + public void disableErasureCodingPolicy(String ecPolicyName) throws IOException { + hdfsAdmin.disableErasureCodingPolicy(ecPolicyName); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java ---------------------------------------------------------------------- diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index b890817..2e84ca9 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -24,6 +24,7 @@ import java.net.URI; import java.nio.ByteBuffer; import java.security.AccessControlException; import java.security.NoSuchAlgorithmException; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Set; @@ -633,6 +634,112 @@ public interface HadoopShims { */ public HdfsEncryptionShim createHdfsEncryptionShim(FileSystem fs, Configuration conf) throws IOException; + /** + * Information about an Erasure Coding Policy. + */ + interface HdfsFileErasureCodingPolicy { + String getName(); + String getStatus(); + } + + /** + * This interface encapsulates methods used to get Erasure Coding information from + * HDFS paths in order to to provide commands similar to those provided by the hdfs ec command. + * https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HDFSErasureCoding.html + */ + interface HdfsErasureCodingShim { + /** + * Lists all (enabled, disabled and removed) erasure coding policies registered in HDFS. + * @return a list of erasure coding policies + */ + List<HdfsFileErasureCodingPolicy> getAllErasureCodingPolicies() throws IOException; + + /** + * Enable an erasure coding policy. + * @param ecPolicyName the name of the erasure coding policy + */ + void enableErasureCodingPolicy(String ecPolicyName) throws IOException; + + /** + * Sets an erasure coding policy on a directory at the specified path. + * @param path a directory in HDFS + * @param ecPolicyName the name of the erasure coding policy + */ + void setErasureCodingPolicy(Path path, String ecPolicyName) throws IOException; + + /** + * Get details of the erasure coding policy of a file or directory at the specified path. + * @param path an hdfs file or directory + * @return an erasure coding policy + */ + HdfsFileErasureCodingPolicy getErasureCodingPolicy(Path path) throws IOException; + + /** + * Unset an erasure coding policy set by a previous call to setPolicy on a directory. + * @param path a directory in HDFS + */ + void unsetErasureCodingPolicy(Path path) throws IOException; + + /** + * Remove an erasure coding policy. + * @param ecPolicyName the name of the erasure coding policy + */ + void removeErasureCodingPolicy(String ecPolicyName) throws IOException; + + /** + * Disable an erasure coding policy. + * @param ecPolicyName the name of the erasure coding policy + */ + void disableErasureCodingPolicy(String ecPolicyName) throws IOException; + } + + /** + * This is a dummy class used when the hadoop version does not support hdfs Erasure Coding. + */ + class NoopHdfsErasureCodingShim implements HadoopShims.HdfsErasureCodingShim { + + @Override + public List<HadoopShims.HdfsFileErasureCodingPolicy> getAllErasureCodingPolicies() { + return Collections.emptyList(); + } + + @Override + public void enableErasureCodingPolicy(String ecPolicyName) throws IOException { + } + + @Override + public void setErasureCodingPolicy(Path path, String ecPolicyName) throws IOException { + } + + @Override + public HdfsFileErasureCodingPolicy getErasureCodingPolicy(Path path) throws IOException { + return null; + } + + @Override + public void unsetErasureCodingPolicy(Path path) throws IOException { + } + + @Override + public void removeErasureCodingPolicy(String ecPolicyName) throws IOException { + } + + @Override + public void disableErasureCodingPolicy(String ecPolicyName) throws IOException { + } + + } + + /** + * Returns a new instance of the HdfsErasureCoding shim. + * + * @param fs a FileSystem object + * @param conf a Configuration object + * @return a new instance of the HdfsErasureCoding shim. + * @throws IOException If an error occurred while creating the instance. + */ + HadoopShims.HdfsErasureCodingShim createHdfsErasureCodingShim(FileSystem fs, Configuration conf) throws IOException; + public Path getPathWithoutSchemeAndAuthority(Path path); /** http://git-wip-us.apache.org/repos/asf/hive/blob/87e8c738/testutils/ptest2/conf/deployed/master-mr2.properties ---------------------------------------------------------------------- diff --git a/testutils/ptest2/conf/deployed/master-mr2.properties b/testutils/ptest2/conf/deployed/master-mr2.properties index 7edc307..f04c0ce 100644 --- a/testutils/ptest2/conf/deployed/master-mr2.properties +++ b/testutils/ptest2/conf/deployed/master-mr2.properties @@ -172,3 +172,10 @@ qFileTest.miniLlapLocal.batchSize = 30 qFileTest.miniLlapLocal.queryFilesProperty = qfile qFileTest.miniLlapLocal.include = normal qFileTest.miniLlapLocal.groups.normal = mainProperties.${minillaplocal.query.files} mainProperties.${minillaplocal.shared.query.files} + +qFileTest.erasurecodingCli.driver = TestErasureCodingHDFSCliDriver +qFileTest.erasurecodingCli.directory = ql/src/test/queries/clientpositive +qFileTest.erasurecodingCli.batchSize = 15 +qFileTest.erasurecodingCli.queryFilesProperty = qfile +qFileTest.erasurecodingCli.include = normal +qFileTest.erasurecodingCli.groups.normal = mainProperties.${erasurecoding.only.query.files} mainProperties.${erasurecoding.shared.query.files}