Author: szetszwo
Date: Thu Aug 21 05:22:10 2014
New Revision: 1619293
URL: http://svn.apache.org/r1619293
Log:
Merge r1609845 through r1619277 from trunk.
Added:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java
- copied unchanged from r1619277,
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
- copied unchanged from r1619277,
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt
(contents, props changed)
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred-config.sh
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/conf/ (props
changed)
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/conf/mapred-env.sh
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
(props changed)
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
Propchange: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged
/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project:r1594376-1619194
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1618764-1619277
Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt
(original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt Thu
Aug 21 05:22:10 2014
@@ -86,6 +86,9 @@ Trunk (Unreleased)
MAPREDUCE-6019. MapReduce changes for exposing YARN/MR endpoints on
multiple
interfaces. (Craig Welch, Milan Potocnik, Arpit Agarwal via xgong)
+ MAPREDUCE-6013. [post-HADOOP-9902] mapred version is missing (Akira AJISAKA
+ via aw)
+
BUG FIXES
MAPREDUCE-5714. Removed forceful JVM exit in shutDownJob.
@@ -151,6 +154,16 @@ Trunk (Unreleased)
MAPREDUCE-5867. Fix NPE in KillAMPreemptionPolicy related to
ProportionalCapacityPreemptionPolicy (Sunil G via devaraj)
+ BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
+
+ MAPREDUCE-5890. Support for encrypting Intermediate
+ data and spills in local filesystem. (asuresh via tucu)
+
+ MAPREDUCE-6007. Add support to distcp to preserve raw.* namespace
+ extended attributes. (clamb)
+
+ MAPREDUCE-6041. Fix TestOptionsParser. (clamb)
+
Release 2.6.0 - UNRELEASED
INCOMPATIBLE CHANGES
@@ -236,7 +249,7 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-6012. DBInputSplit creates invalid ranges on Oracle.
(Wei Yan via kasha)
-Release 2.5.0 - UNRELEASED
+Release 2.5.0 - 2014-08-11
INCOMPATIBLE CHANGES
Propchange:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged
/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt:r1594376-1619194
Merged
/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1618764-1619277
Modified: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred
(original)
+++ hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred Thu
Aug 21 05:22:10 2014
@@ -15,138 +15,134 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-bin=`which $0`
-bin=`dirname ${bin}`
-bin=`cd "$bin" > /dev/null; pwd`
-
-DEFAULT_LIBEXEC_DIR="$bin"/../libexec
-HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
-if [ -e ${HADOOP_LIBEXEC_DIR}/mapred-config.sh ]; then
- . ${HADOOP_LIBEXEC_DIR}/mapred-config.sh
-else
- . "$bin/mapred-config.sh"
-fi
-
-function print_usage(){
- echo "Usage: mapred [--config confdir] COMMAND"
+function hadoop_usage
+{
+ echo "Usage: mapred [--config confdir] [--daemon (start|stop|status)]
COMMAND"
echo " where COMMAND is one of:"
- echo " pipes run a Pipes job"
- echo " job manipulate MapReduce jobs"
- echo " queue get information regarding JobQueues"
+
+ echo " archive -archiveName NAME -p <parent path> <src>* <dest> create a
hadoop archive"
echo " classpath prints the class path needed for running"
echo " mapreduce subcommands"
- echo " historyserver run job history servers as a standalone daemon"
echo " distcp <srcurl> <desturl> copy file or directories recursively"
- echo " archive -archiveName NAME -p <parent path> <src>* <dest> create a
hadoop archive"
- echo " hsadmin job history server admin interface"
+ echo " job manipulate MapReduce jobs"
+ echo " historyserver run job history servers as a standalone daemon"
+ echo " pipes run a Pipes job"
+ echo " queue get information regarding JobQueues"
+ echo " sampler sampler"
+ echo " version print the version"
echo ""
echo "Most commands print help when invoked w/o parameters."
}
+this="${BASH_SOURCE-$0}"
+bin=$(cd -P -- "$(dirname -- "${this}")" >/dev/null && pwd -P)
+
+# let's locate libexec...
+if [[ -n "${HADOOP_PREFIX}" ]]; then
+ DEFAULT_LIBEXEC_DIR="${HADOOP_PREFIX}/libexec"
+else
+ DEFAULT_LIBEXEC_DIR="${bin}/../libexec"
+fi
+
+HADOOP_LIBEXEC_DIR="${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}"
+# shellcheck disable=SC2034
+HADOOP_NEW_CONFIG=true
+if [[ -f "${HADOOP_LIBEXEC_DIR}/mapred-config.sh" ]]; then
+ . "${HADOOP_LIBEXEC_DIR}/mapred-config.sh"
+else
+ echo "ERROR: Cannot execute ${HADOOP_LIBEXEC_DIR}/mapred-config.sh." 2>&1
+ exit 1
+fi
+
+
if [ $# = 0 ]; then
- print_usage
- exit
+ hadoop_exit_with_usage 1
fi
COMMAND=$1
shift
-case $COMMAND in
- # usage flags
- --help|-help|-h)
- print_usage
- exit
- ;;
+case ${COMMAND} in
+ mradmin|jobtracker|tasktracker|groups)
+ echo "Sorry, the ${COMMAND} command is no longer supported."
+ echo "You may find similar functionality with the \"yarn\" shell command."
+ hadoop_exit_with_usage 1
+ ;;
+ archive)
+ CLASS=org.apache.hadoop.tools.HadoopArchives
+ hadoop_add_classpath "${TOOL_PATH}"
+ HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
+ ;;
+ classpath)
+ hadoop_finalize
+ echo "${CLASSPATH}"
+ exit 0
+ ;;
+ distcp)
+ CLASS=org.apache.hadoop.tools.DistCp
+ hadoop_add_classpath "${TOOL_PATH}"
+ HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
+ ;;
+ historyserver)
+ daemon="true"
+ CLASS=org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer
+ HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_JOB_HISTORYSERVER_OPTS}"
+ if [ -n "${HADOOP_JOB_HISTORYSERVER_HEAPSIZE}" ]; then
+ JAVA_HEAP_MAX="-Xmx${HADOOP_JOB_HISTORYSERVER_HEAPSIZE}m"
+ fi
+ HADOOP_DAEMON_ROOT_LOGGER=${HADOOP_JHS_LOGGER:-$HADOOP_DAEMON_ROOT_LOGGER}
+ ;;
+ job)
+ CLASS=org.apache.hadoop.mapred.JobClient
+ ;;
+ pipes)
+ CLASS=org.apache.hadoop.mapred.pipes.Submitter
+ HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
+ ;;
+ queue)
+ CLASS=org.apache.hadoop.mapred.JobQueueClient
+ ;;
+ sampler)
+ CLASS=org.apache.hadoop.mapred.lib.InputSampler
+ HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
+ ;;
+ version)
+ CLASS=org.apache.hadoop.util.VersionInfo
+ HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
+ ;;
+ -*|*)
+ hadoop_exit_with_usage 1
+ ;;
esac
-if [ "$COMMAND" = "job" ] ; then
- CLASS=org.apache.hadoop.mapred.JobClient
- HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
-elif [ "$COMMAND" = "queue" ] ; then
- CLASS=org.apache.hadoop.mapred.JobQueueClient
- HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
-elif [ "$COMMAND" = "pipes" ] ; then
- CLASS=org.apache.hadoop.mapred.pipes.Submitter
- HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
-elif [ "$COMMAND" = "sampler" ] ; then
- CLASS=org.apache.hadoop.mapred.lib.InputSampler
- HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
-elif [ "$COMMAND" = "classpath" ] ; then
- echo -n
-elif [ "$COMMAND" = "historyserver" ] ; then
- CLASS=org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer
- HADOOP_OPTS="$HADOOP_OPTS
-Dmapred.jobsummary.logger=${HADOOP_JHS_LOGGER:-INFO,console}
$HADOOP_JOB_HISTORYSERVER_OPTS"
- if [ "$HADOOP_JOB_HISTORYSERVER_HEAPSIZE" != "" ]; then
- JAVA_HEAP_MAX="-Xmx""$HADOOP_JOB_HISTORYSERVER_HEAPSIZE""m"
- fi
-elif [ "$COMMAND" = "mradmin" ] \
- || [ "$COMMAND" = "jobtracker" ] \
- || [ "$COMMAND" = "tasktracker" ] \
- || [ "$COMMAND" = "groups" ] ; then
- echo "Sorry, the $COMMAND command is no longer supported."
- echo "You may find similar functionality with the \"yarn\" shell command."
- print_usage
- exit 1
-elif [ "$COMMAND" = "distcp" ] ; then
- CLASS=org.apache.hadoop.tools.DistCp
- CLASSPATH=${CLASSPATH}:${TOOL_PATH}
- HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
-elif [ "$COMMAND" = "archive" ] ; then
- CLASS=org.apache.hadoop.tools.HadoopArchives
- CLASSPATH=${CLASSPATH}:${TOOL_PATH}
- HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
-elif [ "$COMMAND" = "hsadmin" ] ; then
- CLASS=org.apache.hadoop.mapreduce.v2.hs.client.HSAdmin
- HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
-else
- echo $COMMAND - invalid command
- print_usage
- exit 1
-fi
+daemon_outfile="${HADOOP_LOG_DIR}/hadoop-${HADOOP_IDENT_STRING}-${COMMAND}-${HOSTNAME}.out"
+daemon_pidfile="${HADOOP_PID_DIR}/hadoop-${HADOOP_IDENT_STRING}-${COMMAND}.pid"
-# for developers, add mapred classes to CLASSPATH
-if [ -d "$HADOOP_MAPRED_HOME/build/classes" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/build/classes
-fi
-if [ -d "$HADOOP_MAPRED_HOME/build/webapps" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/build
-fi
-if [ -d "$HADOOP_MAPRED_HOME/build/test/classes" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/build/test/classes
-fi
-if [ -d "$HADOOP_MAPRED_HOME/build/tools" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/build/tools
-fi
-# for releases, add core mapred jar & webapps to CLASSPATH
-if [ -d "$HADOOP_PREFIX/${MAPRED_DIR}/webapps" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/${MAPRED_DIR}
-fi
-for f in $HADOOP_MAPRED_HOME/${MAPRED_DIR}/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-# Need YARN jars also
-for f in $HADOOP_YARN_HOME/${YARN_DIR}/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-# add libs to CLASSPATH
-for f in $HADOOP_MAPRED_HOME/${MAPRED_LIB_JARS_DIR}/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-# add modules to CLASSPATH
-for f in $HADOOP_MAPRED_HOME/modules/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-if [ "$COMMAND" = "classpath" ] ; then
- echo $CLASSPATH
- exit
+if [[ "${HADOOP_DAEMON_MODE}" != "default" ]]; then
+ # shellcheck disable=SC2034
+ HADOOP_ROOT_LOGGER="${HADOOP_DAEMON_ROOT_LOGGER}"
+ hadoop_add_param HADOOP_OPTS mapred.jobsummary.logger
"-Dmapred.jobsummary.logger=${HADOOP_ROOT_LOGGER}"
+ # shellcheck disable=SC2034
+ HADOOP_LOGFILE="hadoop-${HADOOP_IDENT_STRING}-${COMMAND}-${HOSTNAME}.log"
fi
-HADOOP_OPTS="$HADOOP_OPTS
-Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}"
+hadoop_add_param HADOOP_OPTS Xmx "${JAVA_HEAP_MAX}"
+hadoop_finalize
export CLASSPATH
-exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
+
+if [[ -n "${daemon}" ]]; then
+ if [[ -n "${secure_service}" ]]; then
+ hadoop_secure_daemon_handler "${HADOOP_DAEMON_MODE}" "${COMMAND}"\
+ "${CLASS}" "${daemon_pidfile}" "${daemon_outfile}" \
+ "${priv_pidfile}" "${priv_outfile}" "${priv_errfile}" "$@"
+ else
+ hadoop_daemon_handler "${HADOOP_DAEMON_MODE}" "${COMMAND}" "${CLASS}" \
+ "${daemon_pidfile}" "${daemon_outfile}" "$@"
+ fi
+ exit $?
+else
+ hadoop_java_exec "${COMMAND}" "${CLASS}" "$@"
+fi
+
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred-config.sh
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred-config.sh?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred-config.sh
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mapred-config.sh
Thu Aug 21 05:22:10 2014
@@ -18,35 +18,55 @@
# included in all the mapred scripts with source command
# should not be executed directly
-bin=`which "$0"`
-bin=`dirname "${bin}"`
-bin=`cd "$bin"; pwd`
+function hadoop_subproject_init
+{
+ if [ -e "${HADOOP_CONF_DIR}/mapred-env.sh" ]; then
+ . "${HADOOP_CONF_DIR}/mapred-env.sh"
+ fi
+
+ # at some point in time, someone thought it would be a good idea to
+ # create separate vars for every subproject. *sigh*
+ # let's perform some overrides and setup some defaults for bw compat
+ # this way the common hadoop var's == subproject vars and can be
+ # used interchangeable from here on out
+ # ...
+ # this should get deprecated at some point.
+ HADOOP_LOG_DIR="${HADOOP_MAPRED_LOG_DIR:-$HADOOP_LOG_DIR}"
+ HADOOP_MAPRED_LOG_DIR="${HADOOP_LOG_DIR}"
+
+ HADOOP_LOGFILE="${HADOOP_MAPRED_LOGFILE:-$HADOOP_LOGFILE}"
+ HADOOP_MAPRED_LOGFILE="${HADOOP_LOGFILE}"
+
+ HADOOP_NICENESS="${HADOOP_MAPRED_NICENESS:-$HADOOP_NICENESS}"
+ HADOOP_MAPRED_NICENESS="${HADOOP_NICENESS}"
+
+ HADOOP_STOP_TIMEOUT="${HADOOP_MAPRED_STOP_TIMEOUT:-$HADOOP_STOP_TIMEOUT}"
+ HADOOP_MAPRED_STOP_TIMEOUT="${HADOOP_STOP_TIMEOUT}"
+
+ HADOOP_PID_DIR="${HADOOP_MAPRED_PID_DIR:-$HADOOP_PID_DIR}"
+ HADOOP_MAPRED_PID_DIR="${HADOOP_PID_DIR}"
+
+ HADOOP_ROOT_LOGGER="${HADOOP_MAPRED_ROOT_LOGGER:-INFO,console}"
+ HADOOP_MAPRED_ROOT_LOGGER="${HADOOP_ROOT_LOGGER}"
+
+ HADOOP_MAPRED_HOME="${HADOOP_MAPRED_HOME:-$HADOOP_HOME_DIR}"
+
+ HADOOP_IDENT_STRING="${HADOOP_MAPRED_IDENT_STRING:-$HADOOP_IDENT_STRING}"
+ HADOOP_MAPRED_IDENT_STRING="${HADOOP_IDENT_STRING}"
+}
+
+if [[ -z "${HADOOP_LIBEXEC_DIR}" ]]; then
+ _mc_this="${BASH_SOURCE-$0}"
+ HADOOP_LIBEXEC_DIR=$(cd -P -- "$(dirname -- "${_mc_this}")" >/dev/null &&
pwd -P)
+fi
-DEFAULT_LIBEXEC_DIR="$bin"/../libexec
-HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
-if [ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]; then
+if [[ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]]; then
. "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh"
-elif [ -e "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh" ]; then
- . "$HADOOP_COMMON_HOME"/libexec/hadoop-config.sh
-elif [ -e "${HADOOP_COMMON_HOME}/bin/hadoop-config.sh" ]; then
- . "$HADOOP_COMMON_HOME"/bin/hadoop-config.sh
-elif [ -e "${HADOOP_HOME}/bin/hadoop-config.sh" ]; then
- . "$HADOOP_HOME"/bin/hadoop-config.sh
-elif [ -e "${HADOOP_MAPRED_HOME}/bin/hadoop-config.sh" ]; then
- . "$HADOOP_MAPRED_HOME"/bin/hadoop-config.sh
+elif [[ -e "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh" ]]; then
+ . "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh"
+elif [[ -e "${HADOOP_HOME}/libexec/hadoop-config.sh" ]]; then
+ . "${HADOOP_HOME}/libexec/hadoop-config.sh"
else
echo "Hadoop common not found."
exit
fi
-
-# Only set locally to use in HADOOP_OPTS. No need to export.
-# The following defaults are useful when somebody directly invokes bin/mapred.
-HADOOP_MAPRED_LOG_DIR=${HADOOP_MAPRED_LOG_DIR:-${HADOOP_MAPRED_HOME}/logs}
-HADOOP_MAPRED_LOGFILE=${HADOOP_MAPRED_LOGFILE:-hadoop.log}
-HADOOP_MAPRED_ROOT_LOGGER=${HADOOP_MAPRED_ROOT_LOGGER:-INFO,console}
-
-HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.log.dir=$HADOOP_MAPRED_LOG_DIR"
-HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.log.file=$HADOOP_MAPRED_LOGFILE"
-export HADOOP_OPTS="$HADOOP_OPTS
-Dhadoop.root.logger=${HADOOP_MAPRED_ROOT_LOGGER}"
-
-
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh
Thu Aug 21 05:22:10 2014
@@ -15,133 +15,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
-#
-# Environment Variables
-#
-# HADOOP_JHS_LOGGER Hadoop JobSummary logger.
-# HADOOP_CONF_DIR Alternate conf dir. Default is ${HADOOP_MAPRED_HOME}/conf.
-# HADOOP_MAPRED_PID_DIR The pid files are stored. /tmp by default.
-# HADOOP_MAPRED_NICENESS The scheduling priority for daemons. Defaults to 0.
-##
-
-usage="Usage: mr-jobhistory-daemon.sh [--config <conf-dir>] (start|stop)
<mapred-command> "
-
-# if no args specified, show usage
-if [ $# -le 1 ]; then
- echo $usage
- exit 1
-fi
-
-bin=`dirname "${BASH_SOURCE-$0}"`
-bin=`cd "$bin"; pwd`
-
-DEFAULT_LIBEXEC_DIR="$bin"/../libexec
-HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
-if [ -e ${HADOOP_LIBEXEC_DIR}/mapred-config.sh ]; then
- . $HADOOP_LIBEXEC_DIR/mapred-config.sh
-fi
-
-# get arguments
-startStop=$1
-shift
-command=$1
-shift
-
-hadoop_rotate_log ()
+function hadoop_usage
{
- log=$1;
- num=5;
- if [ -n "$2" ]; then
- num=$2
- fi
- if [ -f "$log" ]; then # rotate logs
- while [ $num -gt 1 ]; do
- prev=`expr $num - 1`
- [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
- num=$prev
- done
- mv "$log" "$log.$num";
- fi
+ echo "Usage: mr-jobhistory-daemon.sh [--config confdir] (start|stop|status)
<hadoop-command> <args...>"
}
-if [ "$HADOOP_MAPRED_IDENT_STRING" = "" ]; then
- export HADOOP_MAPRED_IDENT_STRING="$USER"
-fi
-
-export HADOOP_MAPRED_HOME=${HADOOP_MAPRED_HOME:-${HADOOP_PREFIX}}
-export
HADOOP_MAPRED_LOGFILE=mapred-$HADOOP_MAPRED_IDENT_STRING-$command-$HOSTNAME.log
-export HADOOP_MAPRED_ROOT_LOGGER=${HADOOP_MAPRED_ROOT_LOGGER:-INFO,RFA}
-export HADOOP_JHS_LOGGER=${HADOOP_JHS_LOGGER:-INFO,JSA}
-
-if [ -f "${HADOOP_CONF_DIR}/mapred-env.sh" ]; then
- . "${HADOOP_CONF_DIR}/mapred-env.sh"
-fi
-
-mkdir -p "$HADOOP_MAPRED_LOG_DIR"
-chown $HADOOP_MAPRED_IDENT_STRING $HADOOP_MAPRED_LOG_DIR
-
-if [ "$HADOOP_MAPRED_PID_DIR" = "" ]; then
- HADOOP_MAPRED_PID_DIR=/tmp
-fi
-
-HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.id.str=$HADOOP_MAPRED_IDENT_STRING"
-
-log=$HADOOP_MAPRED_LOG_DIR/mapred-$HADOOP_MAPRED_IDENT_STRING-$command-$HOSTNAME.out
-pid=$HADOOP_MAPRED_PID_DIR/mapred-$HADOOP_MAPRED_IDENT_STRING-$command.pid
-
-HADOOP_MAPRED_STOP_TIMEOUT=${HADOOP_MAPRED_STOP_TIMEOUT:-5}
-
-# Set default scheduling priority
-if [ "$HADOOP_MAPRED_NICENESS" = "" ]; then
- export HADOOP_MAPRED_NICENESS=0
+# let's locate libexec...
+if [[ -n "${HADOOP_PREFIX}" ]]; then
+ DEFAULT_LIBEXEC_DIR="${HADOOP_PREFIX}/libexec"
+else
+ this="${BASH_SOURCE-$0}"
+ bin=$(cd -P -- "$(dirname -- "${this}")" >/dev/null && pwd -P)
+ DEFAULT_LIBEXEC_DIR="${bin}/../libexec"
+fi
+
+HADOOP_LIBEXEC_DIR="${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}"
+# shellcheck disable=SC2034
+HADOOP_NEW_CONFIG=true
+if [[ -f "${HADOOP_LIBEXEC_DIR}/yarn-config.sh" ]]; then
+ . "${HADOOP_LIBEXEC_DIR}/yarn-config.sh"
+else
+ echo "ERROR: Cannot execute ${HADOOP_LIBEXEC_DIR}/yarn-config.sh." 2>&1
+ exit 1
fi
-case $startStop in
-
- (start)
-
- mkdir -p "$HADOOP_MAPRED_PID_DIR"
-
- if [ -f $pid ]; then
- if kill -0 `cat $pid` > /dev/null 2>&1; then
- echo $command running as process `cat $pid`. Stop it first.
- exit 1
- fi
- fi
-
- hadoop_rotate_log $log
- echo starting $command, logging to $log
- cd "$HADOOP_MAPRED_HOME"
- nohup nice -n $HADOOP_MAPRED_NICENESS "$HADOOP_MAPRED_HOME"/bin/mapred
--config $HADOOP_CONF_DIR $command "$@" > "$log" 2>&1 < /dev/null &
- echo $! > $pid
- sleep 1; head "$log"
- ;;
-
- (stop)
-
- if [ -f $pid ]; then
- TARGET_PID=`cat $pid`
- if kill -0 $TARGET_PID > /dev/null 2>&1; then
- echo stopping $command
- kill $TARGET_PID
- sleep $HADOOP_MAPRED_STOP_TIMEOUT
- if kill -0 $TARGET_PID > /dev/null 2>&1; then
- echo "$command did not stop gracefully after
$HADOOP_MAPRED_STOP_TIMEOUT seconds: killing with kill -9"
- kill -9 $TARGET_PID
- fi
- else
- echo no $command to stop
- fi
- rm -f $pid
- else
- echo no $command to stop
- fi
- ;;
-
- (*)
- echo $usage
- exit 1
- ;;
+daemonmode=$1
+shift
-esac
+exec "${HADOOP_MAPRED_HOME}/bin/mapred" \
+--config "${HADOOP_CONF_DIR}" --daemon "${daemonmode}" "$@"
Propchange: hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
Merged
/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/conf:r1594376-1619194
Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1609845-1619277
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/conf/mapred-env.sh
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/conf/mapred-env.sh?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/conf/mapred-env.sh
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/conf/mapred-env.sh
Thu Aug 21 05:22:10 2014
@@ -13,15 +13,59 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-# export JAVA_HOME=/home/y/libexec/jdk1.6.0/
+##
+## THIS FILE ACTS AS AN OVERRIDE FOR hadoop-env.sh FOR ALL
+## WORK DONE BY THE mapred AND RELATED COMMANDS.
+##
+## Precedence rules:
+##
+## mapred-env.sh > hadoop-env.sh > hard-coded defaults
+##
+## MAPRED_xyz > HADOOP_xyz > hard-coded defaults
+##
-export HADOOP_JOB_HISTORYSERVER_HEAPSIZE=1000
+###
+# Generic settings for MapReduce
+###
-export HADOOP_MAPRED_ROOT_LOGGER=INFO,RFA
+#Override the log4j settings for all MR apps
+# export MAPRED_ROOT_LOGGER="INFO,console"
+# Override Hadoop's log directory & file
+# export HADOOP_MAPRED_LOG_DIR=""
+
+# Override Hadoop's pid directory
+# export HADOOP_MAPRED_PID_DIR=
+
+# Override Hadoop's identity string. $USER by default.
+# This is used in writing log and pid files, so keep that in mind!
+# export HADOOP_MAPRED_IDENT_STRING=$USER
+
+# Override Hadoop's process priority
+# Note that sub-processes will also run at this level!
+# export HADOOP_MAPRED_NICENESS=0
+
+###
+# Job History Server specific parameters
+###
+
+# Specify the max heapsize for the Job History Server using a numerical value
+# in the scale of MB. For example, to specify an jvm option of -Xmx1000m, set
+# the value to 1000.
+# This value will be overridden by an Xmx setting specified in either
+# MAPRED_OPTS, HADOOP_OPTS, and/or HADOOP_JOB_HISTORYSERVER_OPTS.
+# If not specified, the default value will be picked from either YARN_HEAPMAX
+# or JAVA_HEAP_MAX with YARN_HEAPMAX as the preferred option of the two.
+#
+#export HADOOP_JOB_HISTORYSERVER_HEAPSIZE=1000
+
+# Specify the JVM options to be used when starting the ResourceManager.
+# These options will be appended to the options specified as YARN_OPTS
+# and therefore may override any similar flags set in YARN_OPTS
#export HADOOP_JOB_HISTORYSERVER_OPTS=
-#export HADOOP_MAPRED_LOG_DIR="" # Where log files are stored.
$HADOOP_MAPRED_HOME/logs by default.
-#export HADOOP_JHS_LOGGER=INFO,RFA # Hadoop JobSummary logger.
-#export HADOOP_MAPRED_PID_DIR= # The pid files are stored. /tmp by default.
-#export HADOOP_MAPRED_IDENT_STRING= #A string representing this instance of
hadoop. $USER by default
-#export HADOOP_MAPRED_NICENESS= #The scheduling priority for daemons. Defaults
to 0.
+
+# Specify the log4j settings for the JobHistoryServer
+#export HADOOP_JHS_LOGGER=INFO,RFA
+
+
+
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
Thu Aug 21 05:22:10 2014
@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapred.Merger.S
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.CryptoUtils;
/**
* <code>BackupStore</code> is an utility class that is used to support
@@ -572,7 +574,9 @@ public class BackupStore<K,V> {
file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(),
-1, conf);
- return new Writer<K, V>(conf, fs, file);
+ FSDataOutputStream out = fs.create(file);
+ out = CryptoUtils.wrapIfNecessary(conf, out);
+ return new Writer<K, V>(conf, out, null, null, null, null, true);
}
}
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
Thu Aug 21 05:22:10 2014
@@ -90,13 +90,11 @@ public class IFile {
DataOutputBuffer buffer = new DataOutputBuffer();
- public Writer(Configuration conf, FileSystem fs, Path file,
- Class<K> keyClass, Class<V> valueClass,
- CompressionCodec codec,
- Counters.Counter writesCounter) throws IOException {
- this(conf, fs.create(file), keyClass, valueClass, codec,
- writesCounter);
- ownOutputStream = true;
+ public Writer(Configuration conf, FSDataOutputStream out,
+ Class<K> keyClass, Class<V> valueClass,
+ CompressionCodec codec, Counters.Counter writesCounter)
+ throws IOException {
+ this(conf, out, keyClass, valueClass, codec, writesCounter, false);
}
protected Writer(Counters.Counter writesCounter) {
@@ -105,7 +103,8 @@ public class IFile {
public Writer(Configuration conf, FSDataOutputStream out,
Class<K> keyClass, Class<V> valueClass,
- CompressionCodec codec, Counters.Counter writesCounter)
+ CompressionCodec codec, Counters.Counter writesCounter,
+ boolean ownOutputStream)
throws IOException {
this.writtenRecordsCounter = writesCounter;
this.checksumOut = new IFileOutputStream(out);
@@ -137,11 +136,7 @@ public class IFile {
this.valueSerializer = serializationFactory.getSerializer(valueClass);
this.valueSerializer.open(buffer);
}
- }
-
- public Writer(Configuration conf, FileSystem fs, Path file)
- throws IOException {
- this(conf, fs, file, null, null, null, null);
+ this.ownOutputStream = ownOutputStream;
}
public void close() throws IOException {
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
Thu Aug 21 05:22:10 2014
@@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.lib.m
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
@@ -1580,7 +1581,8 @@ public class MapTask extends Task {
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
- writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
+ FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job,
out);
+ writer = new Writer<K, V>(job, partitionOut, keyClass, valClass,
codec,
spilledRecordsCounter);
if (combinerRunner == null) {
// spill directly
@@ -1617,8 +1619,8 @@ public class MapTask extends Task {
// record offsets
rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
+ rec.rawLength = writer.getRawLength() +
CryptoUtils.cryptoPadding(job);
+ rec.partLength = writer.getCompressedLength() +
CryptoUtils.cryptoPadding(job);
spillRec.putIndex(rec, i);
writer = null;
@@ -1668,7 +1670,8 @@ public class MapTask extends Task {
try {
long segmentStart = out.getPos();
// Create a new codec, don't care!
- writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
+ FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job,
out);
+ writer = new IFile.Writer<K,V>(job, partitionOut, keyClass,
valClass, codec,
spilledRecordsCounter);
if (i == partition) {
@@ -1682,8 +1685,8 @@ public class MapTask extends Task {
// record offsets
rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
+ rec.rawLength = writer.getRawLength() +
CryptoUtils.cryptoPadding(job);
+ rec.partLength = writer.getCompressedLength() +
CryptoUtils.cryptoPadding(job);
spillRec.putIndex(rec, i);
writer = null;
@@ -1825,12 +1828,13 @@ public class MapTask extends Task {
try {
for (int i = 0; i < partitions; i++) {
long segmentStart = finalOut.getPos();
+ FSDataOutputStream finalPartitionOut =
CryptoUtils.wrapIfNecessary(job, finalOut);
Writer<K, V> writer =
- new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
+ new Writer<K, V>(job, finalPartitionOut, keyClass, valClass,
codec, null);
writer.close();
rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
+ rec.rawLength = writer.getRawLength() +
CryptoUtils.cryptoPadding(job);
+ rec.partLength = writer.getCompressedLength() +
CryptoUtils.cryptoPadding(job);
sr.putIndex(rec, i);
}
sr.writeToFile(finalIndexFile, job);
@@ -1879,8 +1883,9 @@ public class MapTask extends Task {
//write merged output to disk
long segmentStart = finalOut.getPos();
+ FSDataOutputStream finalPartitionOut =
CryptoUtils.wrapIfNecessary(job, finalOut);
Writer<K, V> writer =
- new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
+ new Writer<K, V>(job, finalPartitionOut, keyClass, valClass,
codec,
spilledRecordsCounter);
if (combinerRunner == null || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter, job);
@@ -1896,8 +1901,8 @@ public class MapTask extends Task {
// record offsets
rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
+ rec.rawLength = writer.getRawLength() +
CryptoUtils.cryptoPadding(job);
+ rec.partLength = writer.getCompressedLength() +
CryptoUtils.cryptoPadding(job);
spillRec.putIndex(rec, parts);
}
spillRec.writeToFile(finalIndexFile, job);
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
Thu Aug 21 05:22:10 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
@@ -40,6 +41,7 @@ import org.apache.hadoop.mapred.IFile.Re
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.util.PriorityQueue;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
@@ -298,8 +300,12 @@ public class Merger {
void init(Counters.Counter readsCounter) throws IOException {
if (reader == null) {
FSDataInputStream in = fs.open(file);
+
in.seek(segmentOffset);
- reader = new Reader<K, V>(conf, in, segmentLength, codec,
readsCounter);
+ in = CryptoUtils.wrapIfNecessary(conf, in);
+ reader = new Reader<K, V>(conf, in,
+ segmentLength - CryptoUtils.cryptoPadding(conf),
+ codec, readsCounter);
}
if (mapOutputsCounter != null) {
@@ -714,9 +720,10 @@ public class Merger {
tmpFilename.toString(),
approxOutputSize, conf);
- Writer<K, V> writer =
- new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec,
- writesCounter);
+ FSDataOutputStream out = fs.create(outputFile);
+ out = CryptoUtils.wrapIfNecessary(conf, out);
+ Writer<K, V> writer = new Writer<K, V>(conf, out, keyClass,
valueClass,
+ codec, writesCounter, true);
writeFile(this, writer, reporter, conf);
writer.close();
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
Thu Aug 21 05:22:10 2014
@@ -291,7 +291,7 @@ class JobSubmitter {
/**
* configure the jobconf of the user with the command line options of
* -libjars, -files, -archives.
- * @param conf
+ * @param job
* @throws IOException
*/
private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
@@ -376,8 +376,13 @@ class JobSubmitter {
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
+
+ int keyLen = CryptoUtils.isShuffleEncrypted(conf)
+ ?
conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
+
MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
+ : SHUFFLE_KEY_LENGTH;
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
- keyGen.init(SHUFFLE_KEY_LENGTH);
+ keyGen.init(keyLen);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
Thu Aug 21 05:22:10 2014
@@ -771,4 +771,18 @@ public interface MRJobConfig {
public static final String TASK_PREEMPTION =
"mapreduce.job.preemption";
+
+ public static final String MR_ENCRYPTED_INTERMEDIATE_DATA =
+ "mapreduce.job.encrypted-intermediate-data";
+ public static final boolean DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA = false;
+
+ public static final String MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS =
+ "mapreduce.job.encrypted-intermediate-data-key-size-bits";
+ public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS
=
+ 128;
+
+ public static final String MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB =
+ "mapreduce.job.encrypted-intermediate-data.buffer.kb";
+ public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB =
+ 128;
}
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
Thu Aug 21 05:22:10 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.task
import java.io.DataInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.MRCon
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.security.ssl.SSLFactory;
import com.google.common.annotations.VisibleForTesting;
@@ -65,6 +67,7 @@ class Fetcher<K,V> extends Thread {
CONNECTION, WRONG_REDUCE}
private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
+ private final JobConf jobConf;
private final Counters.Counter connectionErrs;
private final Counters.Counter ioErrs;
private final Counters.Counter wrongLengthErrs;
@@ -104,6 +107,7 @@ class Fetcher<K,V> extends Thread {
Reporter reporter, ShuffleClientMetrics metrics,
ExceptionReporter exceptionReporter, SecretKey shuffleKey,
int id) {
+ this.jobConf = job;
this.reporter = reporter;
this.scheduler = scheduler;
this.merger = merger;
@@ -396,7 +400,11 @@ class Fetcher<K,V> extends Thread {
return remaining.toArray(new TaskAttemptID[remaining.size()]);
}
-
+ InputStream is = input;
+ is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);
+ compressedLength -= CryptoUtils.cryptoPadding(jobConf);
+ decompressedLength -= CryptoUtils.cryptoPadding(jobConf);
+
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
remaining, mapId)) {
@@ -433,7 +441,7 @@ class Fetcher<K,V> extends Thread {
LOG.info("fetcher#" + id + " about to shuffle output of map "
+ mapOutput.getMapId() + " decomp: " + decompressedLength
+ " len: " + compressedLength + " to " +
mapOutput.getDescription());
- mapOutput.shuffle(host, input, compressedLength, decompressedLength,
+ mapOutput.shuffle(host, is, compressedLength, decompressedLength,
metrics, reporter);
} catch (java.lang.InternalError e) {
LOG.warn("Failed to shuffle for fetcher#"+id, e);
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
Thu Aug 21 05:22:10 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.MapOutpu
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SpillRecord;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.CryptoUtils;
/**
* LocalFetcher is used by LocalJobRunner to perform a local filesystem
@@ -145,6 +146,9 @@ class LocalFetcher<K,V> extends Fetcher<
// now read the file, seek to the appropriate section, and send it.
FileSystem localFs = FileSystem.getLocal(job).getRaw();
FSDataInputStream inStream = localFs.open(mapOutputFileName);
+
+ inStream = CryptoUtils.wrapIfNecessary(job, inStream);
+
try {
inStream.seek(ir.startOffset);
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
Thu Aug 21 05:22:10 2014
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
@@ -54,6 +55,7 @@ import org.apache.hadoop.mapred.Task.Com
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
@@ -228,6 +230,10 @@ public class MergeManagerImpl<K, V> impl
return new InMemoryMerger(this);
}
+ protected MergeThread<CompressAwarePath,K,V> createOnDiskMerger() {
+ return new OnDiskMerger(this);
+ }
+
TaskAttemptID getReduceId() {
return reduceId;
}
@@ -453,11 +459,10 @@ public class MergeManagerImpl<K, V> impl
mergeOutputSize).suffix(
Task.MERGED_OUTPUT_PREFIX);
- Writer<K,V> writer =
- new Writer<K,V>(jobConf, rfs, outputPath,
- (Class<K>) jobConf.getMapOutputKeyClass(),
- (Class<V>) jobConf.getMapOutputValueClass(),
- codec, null);
+ FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf,
rfs.create(outputPath));
+ Writer<K, V> writer = new Writer<K, V>(jobConf, out,
+ (Class<K>) jobConf.getMapOutputKeyClass(),
+ (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator rIter = null;
CompressAwarePath compressAwarePath;
@@ -537,11 +542,12 @@ public class MergeManagerImpl<K, V> impl
Path outputPath =
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
- Writer<K,V> writer =
- new Writer<K,V>(jobConf, rfs, outputPath,
- (Class<K>) jobConf.getMapOutputKeyClass(),
- (Class<V>) jobConf.getMapOutputValueClass(),
- codec, null);
+
+ FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf,
rfs.create(outputPath));
+ Writer<K, V> writer = new Writer<K, V>(jobConf, out,
+ (Class<K>) jobConf.getMapOutputKeyClass(),
+ (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
+
RawKeyValueIterator iter = null;
CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString());
@@ -717,8 +723,10 @@ public class MergeManagerImpl<K, V> impl
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
tmpDir, comparator, reporter, spilledRecordsCounter, null,
mergePhase);
- Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
- keyClass, valueClass, codec, null);
+
+ FSDataOutputStream out = CryptoUtils.wrapIfNecessary(job,
fs.create(outputPath));
+ Writer<K, V> writer = new Writer<K, V>(job, out, keyClass, valueClass,
+ codec, null, true);
try {
Merger.writeFile(rIter, writer, reporter, job);
writer.close();
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
Thu Aug 21 05:22:10 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.CryptoUtils;
import
org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
import com.google.common.annotations.VisibleForTesting;
@@ -75,7 +76,7 @@ class OnDiskMapOutput<K, V> extends MapO
this.merger = merger;
this.outputPath = outputPath;
tmpOutputPath = getTempPath(outputPath, fetcher);
- disk = fs.create(tmpOutputPath);
+ disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath));
}
@VisibleForTesting
Propchange:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged
/hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1594376-1619194
Merged
/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1618417-1619277
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm
Thu Aug 21 05:22:10 2014
@@ -177,6 +177,12 @@ MapReduce Commands Guide
Creates a hadoop archive. More information can be found at
{{{./HadoopArchives.html}Hadoop Archives Guide}}.
+** <<<version>>>
+
+ Prints the version.
+
+ Usage: <<<mapred version>>>
+
* Administration Commands
Commands useful for administrators of a hadoop cluster.
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm
Thu Aug 21 05:22:10 2014
@@ -191,6 +191,26 @@ $H3 Update and Overwrite
If `-update` is used, `1` is overwritten as well.
+$H3 raw Namespace Extended Attribute Preservation
+
+ This section only applies to HDFS.
+
+ If the target and all of the source pathnames are in the /.reserved/raw
+ hierarchy, then 'raw' namespace extended attributes will be preserved.
+ 'raw' xattrs are used by the system for internal functions such as encryption
+ meta data. They are only visible to users when accessed through the
+ /.reserved/raw hierarchy.
+
+ raw xattrs are preserved based solely on whether /.reserved/raw prefixes are
+ supplied. The -p (preserve, see below) flag does not impact preservation of
+ raw xattrs.
+
+ To prevent raw xattrs from being preserved, simply do not use the
+ /.reserved/raw prefix on any of the source and target paths.
+
+ If the /.reserved/raw prefix is specified on only a subset of the source and
+ target paths, an error will be displayed and a non-0 exit code returned.
+
Command Line Options
--------------------
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
Thu Aug 21 05:22:10 2014
@@ -24,14 +24,16 @@ import static org.mockito.Mockito.doAnsw
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.junit.Assert;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -51,10 +53,16 @@ import org.apache.hadoop.mapred.RawKeyVa
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl;
+import
org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.junit.After;
@@ -63,40 +71,48 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import com.google.common.collect.Lists;
+
public class TestMerger {
private Configuration conf;
private JobConf jobConf;
private FileSystem fs;
-
+
@Before
public void setup() throws IOException {
conf = new Configuration();
jobConf = new JobConf();
fs = FileSystem.getLocal(conf);
}
-
- @After
- public void cleanup() throws IOException {
- fs.delete(new Path(jobConf.getLocalDirs()[0]), true);
+
+
+ @Test
+ public void testEncryptedMerger() throws Throwable {
+ jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+ conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+ Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
+ TokenCache.setShuffleSecretKey(new byte[16], credentials);
+ UserGroupInformation.getCurrentUser().addCredentials(credentials);
+ testInMemoryAndOnDiskMerger();
}
-
+
@Test
- public void testInMemoryMerger() throws Throwable {
+ public void testInMemoryAndOnDiskMerger() throws Throwable {
JobID jobId = new JobID("a", 0);
- TaskAttemptID reduceId = new TaskAttemptID(
+ TaskAttemptID reduceId1 = new TaskAttemptID(
new TaskID(jobId, TaskType.REDUCE, 0), 0);
TaskAttemptID mapId1 = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, 1), 0);
TaskAttemptID mapId2 = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, 2), 0);
-
+
LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
-
+
MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text,
Text>(
- reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null,
null,
+ reduceId1, jobConf, fs, lda, Reporter.NULL, null, null, null, null,
null,
null, null, new Progress(), new MROutputFiles());
-
+
// write map outputs
Map<String, String> map1 = new TreeMap<String, String>();
map1.put("apple", "disgusting");
@@ -113,32 +129,88 @@ public class TestMerger {
mapOutputBytes1.length);
System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
mapOutputBytes2.length);
-
+
// create merger and run merge
MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
mergeManager.createInMemoryMerger();
- List<InMemoryMapOutput<Text, Text>> mapOutputs =
+ List<InMemoryMapOutput<Text, Text>> mapOutputs1 =
new ArrayList<InMemoryMapOutput<Text, Text>>();
- mapOutputs.add(mapOutput1);
- mapOutputs.add(mapOutput2);
-
- inMemoryMerger.merge(mapOutputs);
-
+ mapOutputs1.add(mapOutput1);
+ mapOutputs1.add(mapOutput2);
+
+ inMemoryMerger.merge(mapOutputs1);
+
Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
- Path outPath = mergeManager.onDiskMapOutputs.iterator().next();
-
+
+ TaskAttemptID reduceId2 = new TaskAttemptID(
+ new TaskID(jobId, TaskType.REDUCE, 3), 0);
+ TaskAttemptID mapId3 = new TaskAttemptID(
+ new TaskID(jobId, TaskType.MAP, 4), 0);
+ TaskAttemptID mapId4 = new TaskAttemptID(
+ new TaskID(jobId, TaskType.MAP, 5), 0);
+ // write map outputs
+ Map<String, String> map3 = new TreeMap<String, String>();
+ map3.put("apple", "awesome");
+ map3.put("carrot", "amazing");
+ Map<String, String> map4 = new TreeMap<String, String>();
+ map4.put("banana", "bla");
+ byte[] mapOutputBytes3 = writeMapOutput(conf, map3);
+ byte[] mapOutputBytes4 = writeMapOutput(conf, map4);
+ InMemoryMapOutput<Text, Text> mapOutput3 = new InMemoryMapOutput<Text,
Text>(
+ conf, mapId3, mergeManager, mapOutputBytes3.length, null, true);
+ InMemoryMapOutput<Text, Text> mapOutput4 = new InMemoryMapOutput<Text,
Text>(
+ conf, mapId4, mergeManager, mapOutputBytes4.length, null, true);
+ System.arraycopy(mapOutputBytes3, 0, mapOutput3.getMemory(), 0,
+ mapOutputBytes3.length);
+ System.arraycopy(mapOutputBytes4, 0, mapOutput4.getMemory(), 0,
+ mapOutputBytes4.length);
+
+// // create merger and run merge
+ MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger2 =
+ mergeManager.createInMemoryMerger();
+ List<InMemoryMapOutput<Text, Text>> mapOutputs2 =
+ new ArrayList<InMemoryMapOutput<Text, Text>>();
+ mapOutputs2.add(mapOutput3);
+ mapOutputs2.add(mapOutput4);
+
+ inMemoryMerger2.merge(mapOutputs2);
+
+ Assert.assertEquals(2, mergeManager.onDiskMapOutputs.size());
+
+ List<CompressAwarePath> paths = new ArrayList<CompressAwarePath>();
+ Iterator<CompressAwarePath> iterator =
mergeManager.onDiskMapOutputs.iterator();
List<String> keys = new ArrayList<String>();
List<String> values = new ArrayList<String>();
- readOnDiskMapOutput(conf, fs, outPath, keys, values);
- Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
- Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good",
"delicious"));
+ while (iterator.hasNext()) {
+ CompressAwarePath next = iterator.next();
+ readOnDiskMapOutput(conf, fs, next, keys, values);
+ paths.add(next);
+ }
+ Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot",
"apple", "banana", "carrot"));
+ Assert.assertEquals(values, Arrays.asList("awesome", "bla", "amazing",
"disgusting", "pretty good", "delicious"));
+ mergeManager.close();
+
+ mergeManager = new MergeManagerImpl<Text, Text>(
+ reduceId2, jobConf, fs, lda, Reporter.NULL, null, null, null, null,
null,
+ null, null, new Progress(), new MROutputFiles());
+
+ MergeThread<CompressAwarePath,Text,Text> onDiskMerger =
mergeManager.createOnDiskMerger();
+ onDiskMerger.merge(paths);
+
+ Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
+
+ keys = new ArrayList<String>();
+ values = new ArrayList<String>();
+ readOnDiskMapOutput(conf, fs,
mergeManager.onDiskMapOutputs.iterator().next(), keys, values);
+ Assert.assertEquals(keys, Arrays.asList("apple", "apple", "banana",
"banana", "carrot", "carrot"));
+ Assert.assertEquals(values, Arrays.asList("awesome", "disgusting", "pretty
good", "bla", "amazing", "delicious"));
mergeManager.close();
Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size());
Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size());
Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size());
}
-
+
private byte[] writeMapOutput(Configuration conf, Map<String, String>
keysToValues)
throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -152,11 +224,13 @@ public class TestMerger {
writer.close();
return baos.toByteArray();
}
-
+
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path
path,
List<String> keys, List<String> values) throws IOException {
- IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, fs,
- path, null, null);
+ FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
+
+ IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
+ fs.getFileStatus(path).getLen(), null, null);
DataInputBuffer keyBuff = new DataInputBuffer();
DataInputBuffer valueBuff = new DataInputBuffer();
Text key = new Text();
@@ -169,17 +243,17 @@ public class TestMerger {
values.add(value.toString());
}
}
-
+
@Test
public void testCompressed() throws IOException {
testMergeShouldReturnProperProgress(getCompressedSegments());
- }
-
+}
+
@Test
public void testUncompressed() throws IOException {
testMergeShouldReturnProperProgress(getUncompressedSegments());
}
-
+
@SuppressWarnings( { "deprecation", "unchecked" })
public void testMergeShouldReturnProperProgress(
List<Segment<Text, Text>> segments) throws IOException {
@@ -212,7 +286,7 @@ public class TestMerger {
}
return segments;
}
-
+
private List<Segment<Text, Text>> getCompressedSegments() throws IOException
{
List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
for (int i = 1; i < 1; i++) {
@@ -220,7 +294,7 @@ public class TestMerger {
}
return segments;
}
-
+
private Segment<Text, Text> getUncompressedSegment(int i) throws IOException
{
return new Segment<Text, Text>(getReader(i), false);
}
@@ -258,7 +332,7 @@ public class TestMerger {
}
};
}
-
+
private Answer<?> getValueAnswer(final String segmentName) {
return new Answer<Void>() {
int i = 0;
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
Thu Aug 21 05:22:10 2014
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@@ -42,7 +44,7 @@ public class TestIFile {
DefaultCodec codec = new GzipCodec();
codec.setConf(conf);
IFile.Writer<Text, Text> writer =
- new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class,
+ new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class,
Text.class,
codec, null);
writer.close();
}
@@ -56,12 +58,15 @@ public class TestIFile {
Path path = new Path(new Path("build/test.ifile"), "data");
DefaultCodec codec = new GzipCodec();
codec.setConf(conf);
+ FSDataOutputStream out = rfs.create(path);
IFile.Writer<Text, Text> writer =
- new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class,
+ new IFile.Writer<Text, Text>(conf, out, Text.class, Text.class,
codec, null);
writer.close();
+ FSDataInputStream in = rfs.open(path);
IFile.Reader<Text, Text> reader =
- new IFile.Reader<Text, Text>(conf, rfs, path, codec, null);
+ new IFile.Reader<Text, Text>(conf, in, rfs.getFileStatus(path).getLen(),
+ codec, null);
reader.close();
// test check sum
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java
Thu Aug 21 05:22:10 2014
@@ -80,7 +80,7 @@ public class TestReduceTask extends Test
FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
Path path = new Path(tmpDir, "data.in");
IFile.Writer<Text, Text> writer =
- new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class,
+ new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class,
Text.class,
codec, null);
for(Pair p: vals) {
writer.append(new Text(p.key), new Text(p.value));
Modified:
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java?rev=1619293&r1=1619292&r2=1619293&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
Thu Aug 21 05:22:10 2014
@@ -95,9 +95,9 @@ public class TestPipeApplication {
new Counters.Counter(), new Progress());
FileSystem fs = new RawLocalFileSystem();
fs.setConf(conf);
- Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs,
- new Path(workSpace + File.separator + "outfile"),
IntWritable.class,
- Text.class, null, null);
+ Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf,
fs.create(
+ new Path(workSpace + File.separator + "outfile")),
IntWritable.class,
+ Text.class, null, null, true);
output.setWriter(wr);
// stub for client
File fCommand =
getFileCommand("org.apache.hadoop.mapred.pipes.PipeApplicationRunnableStub");
@@ -177,9 +177,9 @@ public class TestPipeApplication {
new Progress());
FileSystem fs = new RawLocalFileSystem();
fs.setConf(conf);
- Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs,
- new Path(workSpace.getAbsolutePath() + File.separator +
"outfile"),
- IntWritable.class, Text.class, null, null);
+ Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf,
fs.create(
+ new Path(workSpace.getAbsolutePath() + File.separator +
"outfile")),
+ IntWritable.class, Text.class, null, null, true);
output.setWriter(wr);
conf.set(Submitter.PRESERVE_COMMANDFILE, "true");