Updated Branches: refs/heads/trunk 6de2fd9bf -> f007a3535
Use EB HadoopCompat for compat with Hadoop 0.2.x Patch by Ben Coversaton, reviewed by brandonwilliams for CASSANDRA-5201 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f007a353 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f007a353 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f007a353 Branch: refs/heads/trunk Commit: f007a35357da582e928dc1ac872e4ebb4c09b708 Parents: 6de2fd9 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Tue Feb 11 15:52:07 2014 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Tue Feb 11 15:53:43 2014 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + build.xml | 3 +++ .../hadoop/AbstractColumnFamilyInputFormat.java | 11 ++++++----- .../hadoop/AbstractColumnFamilyOutputFormat.java | 3 ++- .../apache/cassandra/hadoop/BulkOutputFormat.java | 3 ++- .../apache/cassandra/hadoop/BulkRecordWriter.java | 3 ++- .../cassandra/hadoop/ColumnFamilyInputFormat.java | 17 +++++++++-------- .../hadoop/ColumnFamilyRecordReader.java | 3 ++- .../hadoop/ColumnFamilyRecordWriter.java | 3 ++- .../hadoop/cql3/CqlPagingInputFormat.java | 18 ++++++++++-------- .../hadoop/cql3/CqlPagingRecordReader.java | 5 +++-- .../cassandra/hadoop/cql3/CqlRecordWriter.java | 3 ++- .../cassandra/hadoop/pig/CassandraStorage.java | 5 +++-- .../apache/cassandra/hadoop/pig/CqlStorage.java | 5 +++-- 14 files changed, 50 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d8478e9..eec6296 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -31,6 +31,7 @@ * Avoid repairing already repaired data (CASSANDRA-5351) 2.0.6 + * Add compatibility for Hadoop 0.2.x (CASSANDRA-5201) * Fix EstimatedHistogram races (CASSANDRA-6682) * Failure detector correctly converts initial value to nanos (CASSANDRA-6658) * Add nodetool taketoken to relocate vnodes (CASSANDRA-4445) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index d91e0e0..1494585 100644 --- a/build.xml +++ b/build.xml @@ -374,6 +374,7 @@ <exclusion groupId="org.mortbay.jetty" artifactId="servlet-api"/> </dependency> <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" version="1.0.3"/> + <dependency groupId="com.twitter.elephantbird" artifactId="elephant-bird-hadoop-compat" version="4.3"/> <dependency groupId="org.apache.pig" artifactId="pig" version="0.11.1"/> <dependency groupId="net.java.dev.jna" artifactId="jna" version="4.0.0"/> @@ -417,6 +418,7 @@ <dependency groupId="org.apache.rat" artifactId="apache-rat"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/> + <dependency groupId="com.twitter.elephantbird" artifactId="elephant-bird-hadoop-compat"/> <dependency groupId="org.apache.pig" artifactId="pig"/> <dependency groupId="com.google.code.findbugs" artifactId="jsr305"/> </artifact:pom> @@ -482,6 +484,7 @@ <!-- don't need hadoop classes to run, but if you use the hadoop stuff --> <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/> + <dependency groupId="com.twitter.elephantbird" artifactId="elephant-bird-hadoop-compat" optional="true"/> <dependency groupId="org.apache.pig" artifactId="pig" optional="true"/> <!-- don't need jna to run, but nice to have --> http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java index 746666b..760193f 100644 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.twitter.elephantbird.util.HadoopCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,16 +118,16 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< public List<InputSplit> getSplits(JobContext context) throws IOException { - Configuration conf = context.getConfiguration(); + Configuration conf = HadoopCompat.getConfiguration(context);; validateConfiguration(conf); // cannonical ranges and nodes holding replicas List<TokenRange> masterRangeNodes = getRangeMap(conf); - keyspace = ConfigHelper.getInputKeyspace(context.getConfiguration()); - cfName = ConfigHelper.getInputColumnFamily(context.getConfiguration()); - partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration()); + keyspace = ConfigHelper.getInputKeyspace(conf); + cfName = ConfigHelper.getInputColumnFamily(conf); + partitioner = ConfigHelper.getInputPartitioner(conf); logger.debug("partitioner is {}", partitioner); @@ -344,7 +345,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< // public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { - TaskAttemptContext tac = new TaskAttemptContext(jobConf, new TaskAttemptID()); + TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID()); List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = this.getSplits(tac); org.apache.hadoop.mapred.InputSplit[] oldInputSplits = new org.apache.hadoop.mapred.InputSplit[newInputSplits.size()]; for (int i = 0; i < newInputSplits.size(); i++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java index 2040f61..a3c4234 100644 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import com.twitter.elephantbird.util.HadoopCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +73,7 @@ public abstract class AbstractColumnFamilyOutputFormat<K, Y> extends OutputForma */ public void checkOutputSpecs(JobContext context) { - checkOutputSpecs(context.getConfiguration()); + checkOutputSpecs(HadoopCompat.getConfiguration(context)); } protected void checkOutputSpecs(Configuration conf) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java index f1c5f39..566d5ee 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import com.twitter.elephantbird.util.HadoopCompat; import org.apache.cassandra.thrift.Mutation; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.*; @@ -32,7 +33,7 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>> @Override public void checkOutputSpecs(JobContext context) { - checkOutputSpecs(context.getConfiguration()); + checkOutputSpecs(HadoopCompat.getConfiguration(context)); } private void checkOutputSpecs(Configuration conf) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java index f761a8c..a8e2e13 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java @@ -28,6 +28,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.twitter.elephantbird.util.HadoopCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,7 +86,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> BulkRecordWriter(TaskAttemptContext context) { - this(context.getConfiguration()); + this(HadoopCompat.getConfiguration(context)); this.progress = new Progressable(context); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java index fbd5bf2..362cd70 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import com.twitter.elephantbird.util.HadoopCompat; import org.apache.cassandra.db.Cell; import org.apache.cassandra.db.composites.CellName; import org.apache.hadoop.conf.Configuration; @@ -55,14 +56,14 @@ public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<Byt public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<CellName, Cell>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException { - TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID))) - { - @Override - public void progress() - { - reporter.progress(); - } - }; + TaskAttemptContext tac = HadoopCompat.newMapContext( + jobConf, + TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)), + null, + null, + null, + new ReporterWrapper(reporter), + null); ColumnFamilyRecordReader recordReader = new ColumnFamilyRecordReader(jobConf.getInt(CASSANDRA_HADOOP_MAX_KEY_SIZE, CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT)); recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java index be18f5f..ef883fd 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.*; import com.google.common.collect.*; +import com.twitter.elephantbird.util.HadoopCompat; import org.apache.cassandra.db.Cell; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,7 +133,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap public void initialize(InputSplit split, TaskAttemptContext context) throws IOException { this.split = (ColumnFamilySplit) split; - Configuration conf = context.getConfiguration(); + Configuration conf = HadoopCompat.getConfiguration(context); KeyRange jobRange = ConfigHelper.getInputKeyRange(conf); filter = jobRange == null ? null : jobRange.row_filter; predicate = ConfigHelper.getInputSlicePredicate(conf); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java index 6823342..0ae2a67 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java @@ -23,6 +23,7 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; +import com.twitter.elephantbird.util.HadoopCompat; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.thrift.*; @@ -60,7 +61,7 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By */ ColumnFamilyRecordWriter(TaskAttemptContext context) { - this(context.getConfiguration()); + this(HadoopCompat.getConfiguration(context)); this.progressable = new Progressable(context); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java index 0e1509e..6f4478e 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; +import com.twitter.elephantbird.util.HadoopCompat; import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat; +import org.apache.cassandra.hadoop.ReporterWrapper; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -58,14 +60,14 @@ public class CqlPagingInputFormat extends AbstractColumnFamilyInputFormat<Map<St public RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException { - TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID))) - { - @Override - public void progress() - { - reporter.progress(); - } - }; + TaskAttemptContext tac = HadoopCompat.newMapContext( + jobConf, + TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)), + null, + null, + null, + new ReporterWrapper(reporter), + null); CqlPagingRecordReader recordReader = new CqlPagingRecordReader(); recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java index 002992f..f712584 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java @@ -26,6 +26,7 @@ import java.util.*; import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterables; +import com.twitter.elephantbird.util.HadoopCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,7 +105,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, public void initialize(InputSplit split, TaskAttemptContext context) throws IOException { this.split = (ColumnFamilySplit) split; - Configuration conf = context.getConfiguration(); + Configuration conf = HadoopCompat.getConfiguration(context); totalRowCount = (this.split.getLength() < Long.MAX_VALUE) ? (int) this.split.getLength() : ConfigHelper.getInputSplitSize(conf); @@ -123,7 +124,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, pageRowSize = DEFAULT_CQL_PAGE_LIMIT; } - partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration()); + partitioner = ConfigHelper.getInputPartitioner(HadoopCompat.getConfiguration(context)); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java index e354ad6..9742762 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import com.twitter.elephantbird.util.HadoopCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +85,7 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB */ CqlRecordWriter(TaskAttemptContext context) throws IOException { - this(context.getConfiguration()); + this(HadoopCompat.getConfiguration(context)); this.progressable = new Progressable(context); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index ae18d20..56f66bb 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.*; +import com.twitter.elephantbird.util.HadoopCompat; import org.apache.cassandra.db.Cell; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -282,7 +283,7 @@ public class CassandraStorage extends AbstractCassandraStorage /** set read configuration settings */ public void setLocation(String location, Job job) throws IOException { - conf = job.getConfiguration(); + conf = HadoopCompat.getConfiguration(job); setLocationFromUri(location); if (ConfigHelper.getInputSlicePredicate(conf) == null) @@ -339,7 +340,7 @@ public class CassandraStorage extends AbstractCassandraStorage /** set store configuration settings */ public void setStoreLocation(String location, Job job) throws IOException { - conf = job.getConfiguration(); + conf = HadoopCompat.getConfiguration(job); // don't combine mappers to a single mapper per node conf.setBoolean("pig.noSplitCombination", true); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f007a353/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java index 76d8026..b349cf7 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.*; +import com.twitter.elephantbird.util.HadoopCompat; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.composites.CellNames; @@ -197,7 +198,7 @@ public class CqlStorage extends AbstractCassandraStorage /** set read configuration settings */ public void setLocation(String location, Job job) throws IOException { - conf = job.getConfiguration(); + conf = HadoopCompat.getConfiguration(job); setLocationFromUri(location); if (username != null && password != null) @@ -256,7 +257,7 @@ public class CqlStorage extends AbstractCassandraStorage /** set store configuration settings */ public void setStoreLocation(String location, Job job) throws IOException { - conf = job.getConfiguration(); + conf = HadoopCompat.getConfiguration(job); setLocationFromUri(location); if (username != null && password != null)