merge from 1.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c5986871 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c5986871 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c5986871 Branch: refs/heads/trunk Commit: c5986871c007f8c552ff624d1fcf064ce6a45c92 Parents: 9a842c7 b55ab4f Author: Jonathan Ellis <jbel...@apache.org> Authored: Mon Feb 13 15:41:30 2012 -0600 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Mon Feb 13 15:41:30 2012 -0600 ---------------------------------------------------------------------- CHANGES.txt | 3 -- .../cassandra/hadoop/pig/CassandraStorage.java | 14 ++++++- .../cassandra/locator/NetworkTopologyStrategy.java | 2 +- .../apache/cassandra/locator/TokenMetadata.java | 28 +++++++++++---- .../apache/cassandra/service/StorageService.java | 6 ++-- 5 files changed, 37 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5986871/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index e115a2a,0875da5..359e699 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,83 -1,3 +1,80 @@@ +1.1-dev + * add nodetool rebuild_index (CASSANDRA-3583) + * add nodetool rangekeysample (CASSANDRA-2917) + * Fix streaming too much data during move operations (CASSANDRA-3639) + * Nodetool and CLI connect to localhost by default (CASSANDRA-3568) + * Reduce memory used by primary index sample (CASSANDRA-3743) + * (Hadoop) separate input/output configurations (CASSANDRA-3197, 3765) + * avoid returning internal Cassandra classes over JMX (CASSANDRA-2805) + * add row-level isolation via SnapTree (CASSANDRA-2893) + * Optimize key count estimation when opening sstable on startup + (CASSANDRA-2988) + * multi-dc replication optimization supporting CL > ONE (CASSANDRA-3577) + * add command to stop compactions (CASSANDRA-1740, 3566, 3582) + * multithreaded streaming (CASSANDRA-3494) + * removed in-tree redhat spec (CASSANDRA-3567) + * "defragment" rows for name-based queries under STCS, again (CASSANDRA-2503) + * Recycle commitlog segments for improved performance + (CASSANDRA-3411, 3543, 3557, 3615) + * update size-tiered compaction to prioritize small tiers (CASSANDRA-2407) + * add message expiration logic to OutboundTcpConnection (CASSANDRA-3005) + * off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271) + * EACH_QUORUM is only supported for writes (CASSANDRA-3272) + * replace compactionlock use in schema migration by checking CFS.isValid + (CASSANDRA-3116) + * recognize that "SELECT first ... *" isn't really "SELECT *" (CASSANDRA-3445) + * Use faster bytes comparison (CASSANDRA-3434) + * Bulk loader is no longer a fat client, (HADOOP) bulk load output format + (CASSANDRA-3045) + * (Hadoop) add support for KeyRange.filter + * remove assumption that keys and token are in bijection + (CASSANDRA-1034, 3574, 3604) + * always remove endpoints from delevery queue in HH (CASSANDRA-3546) + * fix race between cf flush and its 2ndary indexes flush (CASSANDRA-3547) + * fix potential race in AES when a repair fails (CASSANDRA-3548) + * Remove columns shadowed by a deleted container even when we cannot purge + (CASSANDRA-3538) + * Improve memtable slice iteration performance (CASSANDRA-3545) + * more efficient allocation of small bloom filters (CASSANDRA-3618) + * Use separate writer thread in SSTableSimpleUnsortedWriter (CASSANDRA-3619) + * fsync the directory after new sstable or commitlog segment are created (CASSANDRA-3250) + * fix minor issues reported by FindBugs (CASSANDRA-3658) + * global key/row caches (CASSANDRA-3143, 3849) + * optimize memtable iteration during range scan (CASSANDRA-3638) + * introduce 'crc_check_chance' in CompressionParameters to support + a checksum percentage checking chance similarly to read-repair (CASSANDRA-3611) + * a way to deactivate global key/row cache on per-CF basis (CASSANDRA-3667) + * fix LeveledCompactionStrategy broken because of generation pre-allocation + in LeveledManifest (CASSANDRA-3691) + * finer-grained control over data directories (CASSANDRA-2749) + * Fix ClassCastException during hinted handoff (CASSANDRA-3694) + * Upgrade Thrift to 0.7 (CASSANDRA-3213) + * Make stress.java insert operation to use microseconds (CASSANDRA-3725) + * Allows (internally) doing a range query with a limit of columns instead of + rows (CASSANDRA-3742) + * Allow rangeSlice queries to be start/end inclusive/exclusive (CASSANDRA-3749) + * Fix BulkLoader to support new SSTable layout and add stream + throttling to prevent an NPE when there is no yaml config (CASSANDRA-3752) + * Allow concurrent schema migrations (CASSANDRA-1391, 3832) + * Add SnapshotCommand to trigger snapshot on remote node (CASSANDRA-3721) + * Make CFMetaData conversions to/from thrift/native schema inverses + (CASSANDRA_3559) + * Add initial code for CQL 3.0-beta (CASSANDRA-3781, 3753) + * Add wide row support for ColumnFamilyInputFormat (CASSANDRA-3264) + * Allow extending CompositeType comparator (CASSANDRA-3657) + * Avoids over-paging during get_count (CASSANDRA-3798) + * Add new command to rebuild a node without (repair) merkle tree calculations + (CASSANDRA-3483) + * respect not only row cache capacity but caching mode when + trying to read data (CASSANDRA-3812) + * fix system tests (CASSANDRA-3827) + * CQL support for altering key_validation_class in ALTER TABLE (CASSANDRA-3781) + * turn compression on by default (CASSANDRA-3871) + * make hexToBytes refuse invalid input (CASSANDRA-2851) + * Make secondary indexes CF inherit compression and compaction from their + parent CF (CASSANDRA-3877) - Merged from 1.0: - * Only snapshot CF being compacted for snapshot_before_compaction - (CASSANDRA-3803) + + 1.0.8 * avoid including non-queried nodes in rangeslice read repair (CASSANDRA-3843) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5986871/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 8f60f4a,0000000..e424c4b mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@@ -1,725 -1,0 +1,735 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.cassandra.hadoop.pig; + +import java.io.IOException; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.IntegerType; +import org.apache.cassandra.db.marshal.TypeParser; +import org.apache.cassandra.thrift.*; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Hex; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.cassandra.db.Column; +import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.hadoop.*; +import org.apache.cassandra.thrift.Mutation; +import org.apache.cassandra.thrift.Deletion; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.utils.ByteBufferUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.*; + +import org.apache.pig.*; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.*; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.impl.util.UDFContext; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; + +/** + * A LoadStoreFunc for retrieving data from and storing data to Cassandra + * + * A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))). + */ +public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata +{ + // system environment variables that can be set to configure connection info: + // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper + public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT"; + public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS"; + public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER"; + public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT"; + public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS"; + public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER"; + public final static String PIG_RPC_PORT = "PIG_RPC_PORT"; + public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS"; + public final static String PIG_PARTITIONER = "PIG_PARTITIONER"; + public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT"; + public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT"; + + private final static String DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat"; + private final static String DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat"; + + private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER; + private static final Log logger = LogFactory.getLog(CassandraStorage.class); + + private ByteBuffer slice_start = BOUND; + private ByteBuffer slice_end = BOUND; + private boolean slice_reverse = false; + private String keyspace; + private String column_family; + private String loadSignature; + private String storeSignature; + + private Configuration conf; + private RecordReader reader; + private RecordWriter writer; + private String inputFormatClass; + private String outputFormatClass; + private int limit; + + public CassandraStorage() + { + this(1024); + } + + /** + * @param limit: number of columns to fetch in a slice + */ + public CassandraStorage(int limit) + { + super(); + this.limit = limit; + } + + public int getLimit() + { + return limit; + } + + @Override + public Tuple getNext() throws IOException + { + try + { + // load the next pair + if (!reader.nextKeyValue()) + return null; + + CfDef cfDef = getCfDef(loadSignature); + ByteBuffer key = (ByteBuffer)reader.getCurrentKey(); + SortedMap<ByteBuffer,IColumn> cf = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); + assert key != null && cf != null; + + // and wrap it in a tuple + Tuple tuple = TupleFactory.getInstance().newTuple(2); + ArrayList<Tuple> columns = new ArrayList<Tuple>(); + tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); + for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet()) + { + columns.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); + } + + tuple.set(1, new DefaultDataBag(columns)); + return tuple; + } + catch (InterruptedException e) + { + throw new IOException(e.getMessage()); + } + } + + private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException + { + Tuple pair = TupleFactory.getInstance().newTuple(2); + List<AbstractType> marshallers = getDefaultMarshallers(cfDef); + Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); + + setTupleValue(pair, 0, comparator.compose(col.name())); + if (col instanceof Column) + { + // standard + if (validators.get(col.name()) == null) + setTupleValue(pair, 1, marshallers.get(1).compose(col.value())); + else + setTupleValue(pair, 1, validators.get(col.name()).compose(col.value())); + return pair; + } + else + { + // super + ArrayList<Tuple> subcols = new ArrayList<Tuple>(); + for (IColumn subcol : col.getSubColumns()) + subcols.add(columnToTuple(subcol, cfDef, parseType(cfDef.getSubcomparator_type()))); + + pair.set(1, new DefaultDataBag(subcols)); + } + return pair; + } + + private void setTupleValue(Tuple pair, int position, Object value) throws ExecException + { + if (value instanceof BigInteger) + pair.set(position, ((BigInteger) value).intValue()); + else if (value instanceof ByteBuffer) + pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value))); + else if (value instanceof UUID) + pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value))); + else + pair.set(position, value); + } + + private CfDef getCfDef(String signature) + { + UDFContext context = UDFContext.getUDFContext(); + Properties property = context.getUDFProperties(CassandraStorage.class); + return cfdefFromString(property.getProperty(signature)); + } + + private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException + { + ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>(); + AbstractType comparator; + AbstractType subcomparator; + AbstractType default_validator; + AbstractType key_validator; + try + { + comparator = TypeParser.parse(cfDef.getComparator_type()); + subcomparator = TypeParser.parse(cfDef.getSubcomparator_type()); + default_validator = TypeParser.parse(cfDef.getDefault_validation_class()); + key_validator = TypeParser.parse(cfDef.getKey_validation_class()); + } + catch (ConfigurationException e) + { + throw new IOException(e); + } + + marshallers.add(comparator); + marshallers.add(default_validator); + marshallers.add(key_validator); + marshallers.add(subcomparator); + return marshallers; + } + + private Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException + { + Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>(); + for (ColumnDef cd : cfDef.getColumn_metadata()) + { + if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty()) + { + AbstractType validator = null; + try + { + validator = TypeParser.parse(cd.getValidation_class()); + validators.put(cd.name, validator); + } + catch (ConfigurationException e) + { + throw new IOException(e); + } + } + } + return validators; + } + + private AbstractType parseType(String type) throws IOException + { + try + { + return TypeParser.parse(type); + } + catch (ConfigurationException e) + { + throw new IOException(e); + } + } + + @Override + public InputFormat getInputFormat() + { + try + { + return FBUtilities.construct(inputFormatClass, "inputformat"); + } + catch (ConfigurationException e) + { + throw new RuntimeException(e); + } + } + + @Override + public void prepareToRead(RecordReader reader, PigSplit split) + { + this.reader = reader; + } + + public static Map<String, String> getQueryMap(String query) + { + String[] params = query.split("&"); + Map<String, String> map = new HashMap<String, String>(); + for (String param : params) + { + String[] keyValue = param.split("="); + map.put(keyValue[0], keyValue[1]); + } + return map; + } + + private void setLocationFromUri(String location) throws IOException + { + // parse uri into keyspace and columnfamily + String names[]; + try + { + if (!location.startsWith("cassandra://")) + throw new Exception("Bad scheme."); + String[] urlParts = location.split("\\?"); + if (urlParts.length > 1) + { + Map<String, String> urlQuery = getQueryMap(urlParts[1]); + AbstractType comparator = BytesType.instance; + if (urlQuery.containsKey("comparator")) + comparator = TypeParser.parse(urlQuery.get("comparator")); + if (urlQuery.containsKey("slice_start")) + slice_start = comparator.fromString(urlQuery.get("slice_start")); + if (urlQuery.containsKey("slice_end")) + slice_end = comparator.fromString(urlQuery.get("slice_end")); + if (urlQuery.containsKey("reversed")) + slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed")); + if (urlQuery.containsKey("limit")) + limit = Integer.parseInt(urlQuery.get("limit")); + } + String[] parts = urlParts[0].split("/+"); + keyspace = parts[1]; + column_family = parts[2]; + } + catch (Exception e) + { + throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]]': " + e.getMessage()); + } + } + + private void setConnectionInformation() throws IOException + { + if (System.getenv(PIG_RPC_PORT) != null) + { + ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT)); + ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT)); + } + + if (System.getenv(PIG_INPUT_RPC_PORT) != null) + ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT)); + if (System.getenv(PIG_OUTPUT_RPC_PORT) != null) + ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT)); + + if (System.getenv(PIG_INITIAL_ADDRESS) != null) + { + ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS)); + ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS)); + } + if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null) + ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS)); + if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null) + ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS)); + + if (System.getenv(PIG_PARTITIONER) != null) + { + ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER)); + ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER)); + } + if(System.getenv(PIG_INPUT_PARTITIONER) != null) + ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER)); + if(System.getenv(PIG_OUTPUT_PARTITIONER) != null) + ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER)); + if (System.getenv(PIG_INPUT_FORMAT) != null) + inputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_INPUT_FORMAT)); + else + inputFormatClass = DEFAULT_INPUT_FORMAT; + if (System.getenv(PIG_OUTPUT_FORMAT) != null) + outputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_OUTPUT_FORMAT)); + else + outputFormatClass = DEFAULT_OUTPUT_FORMAT; + } + + private String getFullyQualifiedClassName(String classname) + { + return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname; + } + + @Override + public void setLocation(String location, Job job) throws IOException + { + conf = job.getConfiguration(); + setLocationFromUri(location); + if (ConfigHelper.getInputSlicePredicate(conf) == null) + { + SliceRange range = new SliceRange(slice_start, slice_end, slice_reverse, limit); + SlicePredicate predicate = new SlicePredicate().setSlice_range(range); + ConfigHelper.setInputSlicePredicate(conf, predicate); + } + ConfigHelper.setInputColumnFamily(conf, keyspace, column_family); + setConnectionInformation(); + + if (ConfigHelper.getInputRpcPort(conf) == 0) + throw new IOException("PIG_INPUT_RPC_PORT or PIG_RPC_PORT environment variable not set"); + if (ConfigHelper.getInputInitialAddress(conf) == null) + throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set"); + if (ConfigHelper.getInputPartitioner(conf) == null) + throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set"); + + initSchema(loadSignature); + } + + public ResourceSchema getSchema(String location, Job job) throws IOException + { + setLocation(location, job); + CfDef cfDef = getCfDef(loadSignature); + + if (cfDef.column_type.equals("Super")) + return null; + // top-level schema, no type + ResourceSchema schema = new ResourceSchema(); + + // get default marshallers and validators + List<AbstractType> marshallers = getDefaultMarshallers(cfDef); + Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); + + // add key + ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema(); + keyFieldSchema.setName("key"); + keyFieldSchema.setType(getPigType(marshallers.get(2))); + + // will become the bag of tuples + ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema(); + bagFieldSchema.setName("columns"); + bagFieldSchema.setType(DataType.BAG); + ResourceSchema bagSchema = new ResourceSchema(); + + List<ResourceFieldSchema> tupleFields = new ArrayList<ResourceFieldSchema>(); + + // default comparator/validator + ResourceSchema innerTupleSchema = new ResourceSchema(); + ResourceFieldSchema tupleField = new ResourceFieldSchema(); + tupleField.setType(DataType.TUPLE); + tupleField.setSchema(innerTupleSchema); + + ResourceFieldSchema colSchema = new ResourceFieldSchema(); + colSchema.setName("name"); + colSchema.setType(getPigType(marshallers.get(0))); + tupleFields.add(colSchema); + + ResourceFieldSchema valSchema = new ResourceFieldSchema(); + AbstractType validator = marshallers.get(1); + valSchema.setName("value"); + valSchema.setType(getPigType(validator)); + tupleFields.add(valSchema); + + // defined validators/indexes + for (ColumnDef cdef : cfDef.column_metadata) + { + colSchema = new ResourceFieldSchema(); + colSchema.setName(new String(cdef.getName())); + colSchema.setType(getPigType(marshallers.get(0))); + tupleFields.add(colSchema); + + valSchema = new ResourceFieldSchema(); + validator = validators.get(ByteBuffer.wrap(cdef.getName())); + if (validator == null) + validator = marshallers.get(1); + valSchema.setName("value"); + valSchema.setType(getPigType(validator)); + tupleFields.add(valSchema); + } + innerTupleSchema.setFields(tupleFields.toArray(new ResourceFieldSchema[tupleFields.size()])); + + // a bag can contain only one tuple, but that tuple can contain anything + bagSchema.setFields(new ResourceFieldSchema[] { tupleField }); + bagFieldSchema.setSchema(bagSchema); + // top level schema contains everything + schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, bagFieldSchema }); + return schema; + } + + private byte getPigType(AbstractType type) + { + if (type instanceof LongType) + return DataType.LONG; + else if (type instanceof IntegerType) + return DataType.INTEGER; + else if (type instanceof AsciiType) + return DataType.CHARARRAY; + else if (type instanceof UTF8Type) + return DataType.CHARARRAY; + else if (type instanceof FloatType) + return DataType.FLOAT; + else if (type instanceof DoubleType) + return DataType.DOUBLE; + return DataType.BYTEARRAY; + } + + public ResourceStatistics getStatistics(String location, Job job) + { + return null; + } + + public String[] getPartitionKeys(String location, Job job) + { + return null; + } + + public void setPartitionFilter(Expression partitionFilter) + { + // no-op + } + + @Override + public String relativeToAbsolutePath(String location, Path curDir) throws IOException + { + return location; + } + + @Override + public void setUDFContextSignature(String signature) + { + this.loadSignature = signature; + } + + /* StoreFunc methods */ + public void setStoreFuncUDFContextSignature(String signature) + { + this.storeSignature = signature; + } + + public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException + { + return relativeToAbsolutePath(location, curDir); + } + + public void setStoreLocation(String location, Job job) throws IOException + { + conf = job.getConfiguration(); + setLocationFromUri(location); + ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family); + setConnectionInformation(); + + if (ConfigHelper.getOutputRpcPort(conf) == 0) + throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set"); + if (ConfigHelper.getOutputInitialAddress(conf) == null) + throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set"); + if (ConfigHelper.getOutputPartitioner(conf) == null) + throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set"); + + initSchema(storeSignature); + } + + public OutputFormat getOutputFormat() + { + try + { + return FBUtilities.construct(outputFormatClass, "outputformat"); + } + catch (ConfigurationException e) + { + throw new RuntimeException(e); + } + } + + public void checkSchema(ResourceSchema schema) throws IOException + { + // we don't care about types, they all get casted to ByteBuffers + } + + public void prepareToWrite(RecordWriter writer) + { + this.writer = writer; + } + + private ByteBuffer objToBB(Object o) + { + if (o == null) + return (ByteBuffer)o; + if (o instanceof java.lang.String) - o = new DataByteArray((String)o); - return ByteBuffer.wrap(((DataByteArray) o).get()); ++ return new ByteBuffer.wrap(DataByteArray((String)o).get()); ++ if (o instanceof Integer) ++ return IntegerType.instance.decompose((BigInteger)o); ++ if (o instanceof Long) ++ return LongType.instance.decompose((Long)o); ++ if (o instanceof Float) ++ return FloatType.instance.decompose((Float)o); ++ if (o instanceof Double) ++ return DoubleType.instance.decompose((Double)o); ++ if (o instanceof UUID) ++ return ByteBuffer.wrap(UUIDGen.decompose((UUID) o)); ++ return null; + } + + public void putNext(Tuple t) throws ExecException, IOException + { + ByteBuffer key = objToBB(t.get(0)); + DefaultDataBag pairs = (DefaultDataBag) t.get(1); + ArrayList<Mutation> mutationList = new ArrayList<Mutation>(); + CfDef cfDef = getCfDef(storeSignature); + try + { + for (Tuple pair : pairs) + { + Mutation mutation = new Mutation(); + if (DataType.findType(pair.get(1)) == DataType.BAG) // supercolumn + { + org.apache.cassandra.thrift.SuperColumn sc = new org.apache.cassandra.thrift.SuperColumn(); + sc.name = objToBB(pair.get(0)); + ArrayList<org.apache.cassandra.thrift.Column> columns = new ArrayList<org.apache.cassandra.thrift.Column>(); + for (Tuple subcol : (DefaultDataBag) pair.get(1)) + { + org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column(); + column.name = objToBB(subcol.get(0)); + column.value = objToBB(subcol.get(1)); + column.setTimestamp(System.currentTimeMillis() * 1000); + columns.add(column); + } + if (columns.isEmpty()) // a deletion + { + mutation.deletion = new Deletion(); + mutation.deletion.super_column = objToBB(pair.get(0)); + mutation.deletion.setTimestamp(System.currentTimeMillis() * 1000); + } + else + { + sc.columns = columns; + mutation.column_or_supercolumn = new ColumnOrSuperColumn(); + mutation.column_or_supercolumn.super_column = sc; + } + } + else // assume column since it couldn't be anything else + { + if (pair.get(1) == null) + { + mutation.deletion = new Deletion(); + mutation.deletion.predicate = new org.apache.cassandra.thrift.SlicePredicate(); + mutation.deletion.predicate.column_names = Arrays.asList(objToBB(pair.get(0))); + mutation.deletion.setTimestamp(System.currentTimeMillis() * 1000); + } + else + { + org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column(); + column.name = objToBB(pair.get(0)); + column.value = objToBB(pair.get(1)); + column.setTimestamp(System.currentTimeMillis() * 1000); + mutation.column_or_supercolumn = new ColumnOrSuperColumn(); + mutation.column_or_supercolumn.column = column; + } + } + mutationList.add(mutation); + } + } + catch (ClassCastException e) + { + throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily", e); + } + try + { + writer.write(key, mutationList); + } + catch (InterruptedException e) + { + throw new IOException(e); + } + } + + public void cleanupOnFailure(String failure, Job job) + { + } + + /* Methods to get the column family schema from Cassandra */ + + private void initSchema(String signature) + { + UDFContext context = UDFContext.getUDFContext(); + Properties property = context.getUDFProperties(CassandraStorage.class); + + // Only get the schema if we haven't already gotten it + if (!property.containsKey(signature)) + { + Cassandra.Client client = null; + try + { + client = ConfigHelper.getClientFromInputAddressList(conf); + CfDef cfDef = null; + client.set_keyspace(keyspace); + KsDef ksDef = client.describe_keyspace(keyspace); + List<CfDef> defs = ksDef.getCf_defs(); + for (CfDef def : defs) + { + if (column_family.equalsIgnoreCase(def.getName())) + { + cfDef = def; + break; + } + } + if (cfDef != null) + property.setProperty(signature, cfdefToString(cfDef)); + else + throw new RuntimeException("Column family '" + column_family + "' not found in keyspace '" + keyspace + "'"); + } + catch (TException e) + { + throw new RuntimeException(e); + } + catch (InvalidRequestException e) + { + throw new RuntimeException(e); + } + catch (NotFoundException e) + { + throw new RuntimeException(e); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + + private static String cfdefToString(CfDef cfDef) + { + assert cfDef != null; + // this is so awful it's kind of cool! + TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory()); + try + { + return Hex.bytesToHex(serializer.serialize(cfDef)); + } + catch (TException e) + { + throw new RuntimeException(e); + } + } + + private static CfDef cfdefFromString(String st) + { + assert st != null; + TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory()); + CfDef cfDef = new CfDef(); + try + { + deserializer.deserialize(cfDef, Hex.hexToBytes(st)); + } + catch (TException e) + { + throw new RuntimeException(e); + } + return cfDef; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5986871/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java index ffbabd6,b6a99b2..382e224 --- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java +++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java @@@ -87,16 -86,13 +87,16 @@@ public class NetworkTopologyStrategy ex String dcName = dcEntry.getKey(); int dcReplicas = dcEntry.getValue(); - // collect endpoints in this DC - TokenMetadata dcTokens = new TokenMetadata(); + // collect endpoints in this DC; add in bulk to token meta data for computational complexity + // reasons (CASSANDRA-3831). + Set<Pair<Token, InetAddress>> dcTokensToUpdate = new HashSet<Pair<Token, InetAddress>>(); - for (Entry<Token, InetAddress> tokenEntry : tokenMetadata.entrySet()) + for (Entry<Token, InetAddress> tokenEntry : tokenMetadata.getTokenToEndpointMapForReading().entrySet()) { if (snitch.getDatacenter(tokenEntry.getValue()).equals(dcName)) - dcTokens.updateNormalToken(tokenEntry.getKey(), tokenEntry.getValue()); + dcTokensToUpdate.add(Pair.create(tokenEntry.getKey(), tokenEntry.getValue())); } + TokenMetadata dcTokens = new TokenMetadata(); + dcTokens.updateNormalTokens(dcTokensToUpdate); List<InetAddress> dcEndpoints = new ArrayList<InetAddress>(dcReplicas); Set<String> racks = new HashSet<String>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5986871/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5986871/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index c1681b9,f82fe32..9bcd54d --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -906,16 -852,13 +906,16 @@@ public class StorageService implements return ranges; } - public Map<Token, String> getTokenToEndpointMap() + public Map<String, String> getTokenToEndpointMap() { - Map<Token, InetAddress> mapInetAddress = tokenMetadata_.getTokenToEndpointMap(); + Map<Token, InetAddress> mapInetAddress = tokenMetadata_.getNormalAndBootstrappingTokenToEndpointMap(); - Map<Token, String> mapString = new HashMap<Token, String>(mapInetAddress.size()); - for (Map.Entry<Token, InetAddress> entry : mapInetAddress.entrySet()) + // in order to preserve tokens in ascending order, we use LinkedHashMap here + Map<String, String> mapString = new LinkedHashMap<String, String>(mapInetAddress.size()); + List<Token> tokens = new ArrayList<Token>(mapInetAddress.keySet()); + Collections.sort(tokens); + for (Token token : tokens) { - mapString.put(entry.getKey(), entry.getValue().getHostAddress()); + mapString.put(token.toString(), mapInetAddress.get(token).getHostAddress()); } return mapString; } @@@ -2603,17 -2575,24 +2603,17 @@@ StorageProxy.truncateBlocking(keyspace, columnFamily); } - public void saveCaches() throws ExecutionException, InterruptedException + public Map<String, Float> getOwnership() { - List<Token> sortedTokens = new ArrayList<Token>(tokenMetadata_.getTokenToEndpointMap().keySet()); - List<Future<?>> futures = new ArrayList<Future<?>>(); - logger_.debug("submitting cache saves"); - for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) ++ List<Token> sortedTokens = new ArrayList<Token>(tokenMetadata_.getTokenToEndpointMapForReading().keySet()); + Collections.sort(sortedTokens); + Map<Token, Float> token_map = partitioner.describeOwnership(sortedTokens); + Map<String, Float> string_map = new HashMap<String, Float>(); + for(Map.Entry<Token, Float> entry : token_map.entrySet()) { - futures.add(cfs.keyCache.submitWrite(-1)); - futures.add(cfs.rowCache.submitWrite(cfs.getRowCacheKeysToSave())); + string_map.put(entry.getKey().toString(), entry.getValue()); } - FBUtilities.waitOnFutures(futures); - logger_.debug("cache saves completed"); - } - - public Map<Token, Float> getOwnership() - { - List<Token> sortedTokens = new ArrayList<Token>(getTokenToEndpointMap().keySet()); - Collections.sort(sortedTokens); - return partitioner.describeOwnership(sortedTokens); + return string_map; } public List<String> getKeyspaces()