Hi Pig team,

We'd like to get your feedback on a set of queries we implemented on Pig.

We've attached the hadoop configuration and pig queries in the email. We start 
the queries by issuing "pig xxx.pig". The queries are from SIGMOD'2009 paper. 
More details are at https://issues.apache.org/jira/browse/HIVE-396 (Shall we 
open a JIRA on PIG for this?)


One improvement is that we are going to change hadoop to use LZO as 
intermediate compression algorithm very soon. Previously we used gzip for all 
performance tests including hadoop, hive and pig.

The reason that we specify the number of reducers in the query is to try to 
match the same number of reducer as Hive automatically suggested. Please let us 
know what is the best way to set the number of reducers in Pig.

Are there any other improvements we can make to the Pig query and the hadoop 
configuration?

Thanks,
Zheng

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>

	<property>
		<name>dfs.balance.bandwidthPerSec</name>
		<value>10485760</value>
		<description>
		Specifies the maximum amount of bandwidth that each datanode
		can utilize for the balancing purpose in term of
		the number of bytes per second.
		</description>
	</property>

	<property>
		<name>dfs.name.dir</name>
		<value>/dfs.metadata</value>
	</property>

	<property>
		<name>fs.default.name</name>
		<value>hdfs://namenode.example.com:8020</value>
	</property>

	<property>
		<name>mapred.job.tracker</name>
		<value>jobtracker.example.com:50029</value>
	</property>

	<property>
		<name>mapred.min.split.size</name>
		<value>65536</value>
	</property>

	<property>
		<name>dfs.replication</name>
		<value>3</value>
	</property>

	<property>
		<name>mapred.reduce.copy.backoff</name>
		<value>5</value>
	</property>
    <property>
        <name>io.sort.factor</name>
        <value>100</value>
    </property>
    <property>
        <name>mapred.reduce.parallel.copies</name>
        <value>25</value>
    </property>
	<property>
		<name>io.sort.mb</name>
		<value>200</value>
	</property>
    <property>
		<name>dfs.data.dir</name>
		<value>/hdfs</value>
    </property>
    <property>
        <name>mapred.local.dir</name>
        <value>/mapred/local</value>
    </property>
    <property>
		<name>dfs.namenode.handler.count</name>
		<value>40</value>
    </property>
    <property>
        <name>io.file.buffer.size</name>
        <value>32768</value>
    </property>
    <property>
        <name>dfs.datanode.du.reserved</name>
        <value>1024000000</value>
    </property>
    <property>
        <name>fs.trash.root</name>
        <value>/Trash</value>
    </property>
    <property>
		<name>fs.trash.interval</name>
		<value>1440</value>
    </property>

    <property>
       <name>mapred.linerecordreader.maxlength</name>
       <value>1000000</value>
    </property>
    <property>
       <name>dfs.block.size</name>
       <value>134217728</value>
    </property>
    <property>
        <name>mapred.tasktracker.dns.interface</name>
        <value>eth0</value>
    </property>
    <property>
        <name>dfs.datanode.dns.interface</name>
        <value>eth0</value>
    </property>
    <property>
        <name>webinterface.private.actions</name>
        <value>true</value>
    </property>
    <!-- Properties that are expected to be overriden by users -->
    <property>
        <name>mapred.reduce.tasks.speculative.execution</name>
        <value>false</value>
    </property>
    <property>
        <name>mapred.speculative.map.gap</name>
        <value>0.9</value>
    </property>
    <property>
        <name>mapred.child.java.opts</name>
        <value>-Xmx1024m -Djava.net.preferIPv4Stack=true</value>
    </property>
    <!-- Deprecated, but .. -->
    <property>
        <name>mapred.speculative.execution</name>
        <value>false</value>
    </property>

    <property>
      <name>dfs.safemode.threshold.pct</name>
      <value>1</value>
      <description>
        Specifies the percentage of blocks that should satisfy
        the minimal replication requirement defined by dfs.replication.min.
        Values less than or equal to 0 mean not to start in safe mode.
        Values greater than 1 will make safe mode permanent.
      </description>
    </property>

    <property>
     <name>dfs.permissions</name>
     <value>false</value>
     <description>
       If "true", enable permission checking in HDFS.
       If "false", permission checking is turned off,
       but all other behavior is unchanged.
       Switching from one parameter value to the other does not change the mode,
       owner or group of files or directories.
     </description>
    </property>

<property>  
  <name>mapred.output.compress</name>  
  <value>true</value> 
</property> 

<property>
  <name>mapred.compress.map.output</name>
  <value>true</value>
  <description>Should the outputs of the maps be compressed before being
               sent across the network. Uses SequenceFile compression.
  </description>
</property>

<property>
  <name>mapred.map.output.compression.type</name>
  <value>BLOCK</value>
  <description>If the map outputs are to compressed, how should they
               be compressed? Should be one of NONE, RECORD or BLOCK.
  </description>
</property>

<property>
  <name>mapred.output.compression.codec</name>
  <value>org.apache.hadoop.io.compress.GzipCodec</value>
</property>

<property>
  <name>mapred.output.compression.type</name>
  <value>BLOCK</value>
</property>

<property>
  <name>mapred.map.output.compression.codec</name>
  <value>org.apache.hadoop.io.compress.GzipCodec</value>
  <description>If the job outputs are compressed, how should they be compressed?
  </description>
</property>

<property>
  <name>mapred.tasktracker.map.tasks.maximum</name>
  <value>5</value>
  <description>The maximum number of map tasks that will be run
  simultaneously by a task tracker.
  </description>
</property>

<property>
  <name>mapred.tasktracker.reduce.tasks.maximum</name>
  <value>5</value>
  <description>The maximum number of reduce tasks that will be run
  simultaneously by a task tracker.
  </description>
</property>

<property>
  <name>fs.checkpoint.dir</name>
  <value>/dfs/namesecondary</value>
  <description>Determines where on the local filesystem the DFS secondary
      name node should store the temporary images and edits to merge.
  </description>
</property>

<property>
  <name>mapred.system.dir</name>
  <value>/mapred/system/prod</value>
  <description>The shared HDFS directory where MapReduce stores control files.
  </description>
</property>     
                
<property>
  <name>mapred.temp.dir</name>
  <value>mapred/temp</value>
  <description>A shared HDFS directory for temporary files.
  </description>
</property>

<property>
  <name>mapred.jobtracker.completeuserjobs.maximum</name>
  <value>10</value>
  <description>The maximum number of complete jobs per user to keep around before delegating them to the job history.
  </description>
</property>


  <property>
    <name>hadoop.job.history.user.location</name>
    <value>none</value>
    <description>
    User can specify a location to store the history files of
    a particular job. If nothing is specified, the logs are stored in
    output directory. The files are stored in "_logs/history/" in the directory.
    User can stop logging by giving the value "none".
    </description>
  </property>

  <property>
    <name>mapred.jobtracker.taskScheduler</name>
    <value>org.apache.hadoop.mapred.FairScheduler</value>
  </property>

  <property>
    <name>mapred.fairscheduler.weightadjuster</name>
    <value>org.apache.hadoop.mapred.NewJobWeightBooster</value>
  </property>

  <property>
    <name>mapred.newjobweightbooster.factor</name>
    <value>3</value>
    <description>Priority boost factor for new jobs</description>
  </property>

  <property>
    <name>mapred.newjobweightbooster.duration</name>
    <value>300000</value>
    <description>Priority boost duration (in milliseconds)</description>
  </property>

  <property>
    <name>mapred.fairscheduler.poolnameproperty</name>
    <value>user.name</value>
    <description>Which jobconf property is used to determine a job's pool name</description>
  </property>

  <property>
    <name>mapred.fairscheduler.assignmultiple</name>
    <value>true</value>
    <description>Allow the scheduler to assign both a map and a reduce in the same heartbeat</description>
  </property>

  <property>
    <name>dfs.http.address</name>
    <value>namenode.example.come:50070</value>
    <description>
      The address and the base port where the dfs namenode web ui will listen on.
      If the port is 0 then the server will start on a free port.
      This is used by the secondary namenode to communicate with the primary namenode.
    </description>
  </property>

  <property>
    <name>mapred.jobtracker.mintime.before.retirejob</name>
    <value>60000</value>
    <description>
      The minimum time (in ms) before a completed job can be retired.
    </description>
  </property>

 <property>
  <name>mapred.jobtracker.maxtasks.per.job</name>
  <value>30000</value>
  <description>The maximum number of tasks for a single job.
  A value of -1 indicates that there is no maximum.  </description>
</property>

 <property>
  <name>mapred.throttle.threshold.percent</name>
  <value>90</value>
  <description>Throttle job submissions if the heap size on the JobTracker
  exceeds this specified percentage.  A value of 100 indicates that
  job submissions are not throttled.  </description>
</property>
<property>
  <name>ipc.client.timeout</name>
  <value>600000</value>
  <description>Defines the timeout for IPC calls in milliseconds.</description>
</property>

</configuration>
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>

<!-- Hive Configuration can either be stored in this file or in the hadoop configuration files  -->
<!-- that are implied by Hadoop setup variables.                                                -->
<!-- Aside from Hadoop setup variables - this file is provided as a convenience so that Hive    -->
<!-- users do not have to edit hadoop configuration files (that may be managed as a centralized -->
<!-- resource).                                                                                 -->

<!-- Hive Execution Parameters -->
<property>
  <name>hive.exec.scratchdir</name>
  <value>/tmp/hive-${user.name}</value>
  <description>Scratch space for Hive jobs</description>
</property>

<property>
  <name>hive.metastore.local</name>
  <value>true</value>
  <description>controls whether to connect to remove metastore server or open a new metastore server in Hive Client JVM</description>
</property>

<property>
  <name>mapred.reduce.tasks</name>
  <value>-1</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:derby:;databaseName=metastore_db;create=true</value>
  <description>JDBC connect string for a JDBC metastore</description>
</property>

<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>org.apache.derby.jdbc.EmbeddedDriver</value>
  <description>Driver class name for a JDBC metastore</description>
</property>

<property>
  <name>hive.metastore.warehouse.dir</name>
  <value>/user/hive/warehouse</value>
  <description>location of default database for the warehouse</description>
</property>

<property>
  <name>hive.metastore.connect.retries</name>
  <value>5</value>
  <description>Number of retries while opening a connection to metastore</description>
</property>

<property>
  <name>hive.metastore.rawstore.impl</name>
  <value>org.apache.hadoop.hive.metastore.ObjectStore</value>
  <description>Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. This class is used to store and retrieval of raw metadata objects such as table, database</description>
</property>

<property>
  <name>hive.default.fileformat</name>
  <value>TextFile</value>
  <description>Default file format for CREATE TABLE statement. Options are TextFile and SequenceFile. Users can explicitly say CREATE TABLE ... STORED AS &lt;TEXTFILE|SEQUENCEFILE&gt; to override</description>
</property>

<property>
  <name>hive.map.aggr</name>
  <value>true</value>
  <description>Whether to use map-side aggregation in Hive Group By queries</description>
</property>

<property>
  <name>hive.groupby.skewindata</name>
  <value>false</value>
  <description>Whether there is skew in data to optimize group by queries</description>
</property>

<property>
  <name>hive.groupby.mapaggr.checkinterval</name>
  <value>100000</value>
  <description>Number of rows after which size of the grouping keys/aggregation classes is performed</description>
</property>

<property>
  <name>hive.mapred.local.mem</name>
  <value>0</value>
  <description>For local mode, memory of the mappers/reducers</description>
</property>

<property>
  <name>hive.map.aggr.hash.percentmemory</name>
  <value>0.8</value>
  <description>Portion of total memory to be used by map-side grup aggregation hash table</description>
</property>

<property>
  <name>hive.optimize.ppd</name>
  <value>false</value>
  <description>Whether to enable predicate pushdown</description>
</property>

<property>
  <name>hive.join.emit.interval</name>
  <value>1000</value>
  <description>How many rows in the right-most join operand Hive should buffer before emitting the join result. </description>
</property>

<property>
  <name>hive.mapred.mode</name>
  <value>nonstrict</value>
  <description>The mode in which the hive operations are being performed. In strict mode, some risky queries are not allowed to run</description>
</property>

<property>
  <name>hive.exec.script.maxerrsize</name>
  <value>100000</value>
  <description>Maximum number of bytes a script is allowed to emit to standard error (per map-reduce task). This prevents runaway scripts from filling logs partitions to capacity </description>
</property>

<property>
  <name>hive.exec.compress.output</name>
  <value>true</value>
  <description> This controls whether the final outputs of a query (to a local/hdfs file or a hive table) is compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* </description>
</property>

<property>
  <name>hive.exec.compress.intermediate</name>
  <value>true</value>
  <description> This controls whether intermediate files produced by hive between multiple map-reduce jobs are compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* </description>
</property>

<property>
  <name>hive.hwi.listen.host</name>
  <value>0.0.0.0</value>
  <description>This is the host address the Hive Web Interface will listen on</description>
</property>
 
<property>
  <name>hive.hwi.listen.port</name>
  <value>9999</value>
  <description>This is the port the Hive Web Interface will listen on</description>
</property>

<property>
  <name>hive.hwi.war.file</name>
  <value>${HIVE_HOME}/lib/hive-hwi.war</value>
  <description>This is the WAR file with the jsp content for Hive Web Interface</description>
</property>

<property>
  <name>hive.exec.pre.hooks</name>
  <value></value>
  <description>Pre Execute Hook for Tests</description>
</property>

</configuration>
# Set Hadoop-specific environment variables here.

# The only required environment variable is JAVA_HOME.  All others are
# optional.  When running a distributed configuration it is best to
# set JAVA_HOME in this file, so that it is correctly defined on
# remote nodes.

# The java implementation to use.  Required.
# export JAVA_HOME=/usr/lib/j2sdk1.5-sun

export JAVA_HOME=${JAVA_HOME:-/usr/local/jdk-6u7-32}
export JAVA_HOME_64=/usr/local/jdk-6u7-64

# Extra Java CLASSPATH elements.  Optional.
# export HADOOP_CLASSPATH=

# The maximum amount of heap to use, in MB. Default is 1000.
export HADOOP_HEAPSIZE=1000

# Extra Java runtime options.  Empty by default.
# export HADOOP_OPTS=-server
export HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true"

# Command specific options appended to HADOOP_OPTS when specified
#export HADOOP_SECONDARYNAMENODE_OPTS=-Xmx2048m
#export HADOOP_JOBTRACKER_OPTS="-Xmx3000m -verbose:gc -XX:+PrintGCTimeStamps 
-XX:+PrintGCDetails -Xloggc:/var/hadoop/logs/jobtracker1.gc.log 
-Dcom.sun.management.jmxremote.port=8997 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.ssl=false"
#export HADOOP_NAMENODE_OPTS="-Xmx2048m -verbose:gc -XX:+PrintGCTimeStamps 
-XX:+PrintGCDetails -Xloggc:/var/hadoop/logs/namenode.gc.log 
-Dcom.sun.management.jmxremote.port=8998 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.ssl=false"
# export HADOOP_TASKTRACKER_OPTS=
# The following applies to multiple commands (fs, dfs, fsck, distcp etc)
# export HADOOP_CLIENT_OPTS

# Extra ssh options.  Empty by default.
# export HADOOP_SSH_OPTS="-o ConnectTimeout=1 -o SendEnv=HADOOP_CONF_DIR"
export HADOOP_SSH_OPTS="-o ConnectTimeout=1"
# Where log files are stored.  $HADOOP_HOME/logs by default.
# export HADOOP_LOG_DIR=${HADOOP_HOME}/logs
export HADOOP_LOG_DIR=/var/hadoop.benchmark/logs

# File naming remote slave hosts.  $HADOOP_HOME/conf/slaves by default.
# export HADOOP_SLAVES=${HADOOP_HOME}/conf/slaves

# host:path where hadoop code should be rsync'd from.  Unset by default.
# export HADOOP_MASTER=master:/home/$USER/src/hadoop

# Seconds to sleep between slave commands.  Unset by default.  This
# can be useful in large clusters, where, e.g., slave rsyncs can
# otherwise arrive faster than the master can service them.
# export HADOOP_SLAVE_SLEEP=0.1

# The directory where pid files are stored. /tmp by default.
# export HADOOP_PID_DIR=/var/hadoop/pids
export HADOOP_PID_DIR=/var/hadoop.benchmark/pids

# A string representing this instance of hadoop. $USER by default.
# export HADOOP_IDENT_STRING=$USER

# The scheduling priority for daemon processes.  See 'man nice'.
# export HADOOP_NICENESS=10

Reply via email to