http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 0a64c87..1ad80b7 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -18,30 +18,46 @@ package org.apache.cassandra.hadoop.pig; import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.*; -import org.apache.cassandra.hadoop.HadoopCompat; -import org.apache.cassandra.db.Cell; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.auth.PasswordAuthenticator; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.Cell; import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.hadoop.*; +import org.apache.cassandra.exceptions.SyntaxException; +import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.hadoop.HadoopCompat; +import org.apache.cassandra.schema.LegacySchemaTables; +import org.apache.cassandra.serializers.CollectionSerializer; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Hex; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.*; import org.apache.pig.Expression; +import org.apache.pig.LoadFunc; +import org.apache.pig.LoadMetadata; import org.apache.pig.ResourceSchema; -import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.ResourceStatistics; +import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.*; import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; @@ -52,7 +68,8 @@ import org.apache.thrift.protocol.TBinaryProtocol; * * A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))). */ -public class CassandraStorage extends AbstractCassandraStorage +@Deprecated +public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata { public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES"; public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT"; @@ -71,6 +88,28 @@ public class CassandraStorage extends AbstractCassandraStorage private boolean widerows = false; private int limit; + + protected String DEFAULT_INPUT_FORMAT; + protected String DEFAULT_OUTPUT_FORMAT; + + protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR }; + + protected String username; + protected String password; + protected String keyspace; + protected String column_family; + protected String loadSignature; + protected String storeSignature; + + protected Configuration conf; + protected String inputFormatClass; + protected String outputFormatClass; + protected int splitSize = 64 * 1024; + protected String partitionerClass; + protected boolean usePartitionFilter = false; + protected String initHostAddress; + protected String rpcPort; + protected int nativeProtocolVersion = 1; // wide row hacks private ByteBuffer lastKey; @@ -104,8 +143,7 @@ public class CassandraStorage extends AbstractCassandraStorage /** read wide row*/ public Tuple getNextWide() throws IOException { - CfInfo cfInfo = getCfInfo(loadSignature); - CfDef cfDef = cfInfo.cfDef; + CfDef cfDef = getCfDef(loadSignature); ByteBuffer key = null; Tuple tuple = null; DefaultDataBag bag = new DefaultDataBag(); @@ -128,7 +166,7 @@ public class CassandraStorage extends AbstractCassandraStorage } for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet()) { - bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type()))); + bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } lastKey = null; lastRow = null; @@ -166,7 +204,7 @@ public class CassandraStorage extends AbstractCassandraStorage addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet()) { - bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type()))); + bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } tuple.append(bag); lastKey = key; @@ -183,14 +221,14 @@ public class CassandraStorage extends AbstractCassandraStorage { for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet()) { - bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type()))); + bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } lastKey = null; lastRow = null; } for (Map.Entry<ByteBuffer, Cell> entry : row.entrySet()) { - bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type()))); + bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } } } @@ -200,7 +238,6 @@ public class CassandraStorage extends AbstractCassandraStorage } } - @Override /** read next row */ public Tuple getNext() throws IOException { @@ -212,8 +249,7 @@ public class CassandraStorage extends AbstractCassandraStorage if (!reader.nextKeyValue()) return null; - CfInfo cfInfo = getCfInfo(loadSignature); - CfDef cfDef = cfInfo.cfDef; + CfDef cfDef = getCfDef(loadSignature); ByteBuffer key = reader.getCurrentKey(); Map<ByteBuffer, Cell> cf = reader.getCurrentValue(); assert key != null && cf != null; @@ -240,7 +276,7 @@ public class CassandraStorage extends AbstractCassandraStorage } if (hasColumn) { - tuple.append(columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type()))); + tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type()))); } else if (!cql3Table) { // otherwise, we need to add an empty tuple to take its place @@ -252,7 +288,7 @@ public class CassandraStorage extends AbstractCassandraStorage for (Map.Entry<ByteBuffer, Cell> entry : cf.entrySet()) { if (!added.containsKey(entry.getKey())) - bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type()))); + bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } tuple.append(bag); // finally, special top-level indexes if needed @@ -260,7 +296,7 @@ public class CassandraStorage extends AbstractCassandraStorage { for (ColumnDef cdef : getIndexes()) { - Tuple throwaway = columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type())); + Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())); tuple.append(throwaway.get(1)); } } @@ -272,14 +308,57 @@ public class CassandraStorage extends AbstractCassandraStorage } } + /** write next row */ + public void putNext(Tuple t) throws IOException + { + /* + We support two cases for output: + First, the original output: + (key, (name, value), (name,value), {(name,value)}) (tuples or bag is optional) + For supers, we only accept the original output. + */ + + if (t.size() < 1) + { + // simply nothing here, we can't even delete without a key + logger.warn("Empty output skipped, filter empty tuples to suppress this warning"); + return; + } + ByteBuffer key = objToBB(t.get(0)); + if (t.getType(1) == DataType.TUPLE) + writeColumnsFromTuple(key, t, 1); + else if (t.getType(1) == DataType.BAG) + { + if (t.size() > 2) + throw new IOException("No arguments allowed after bag"); + writeColumnsFromBag(key, (DataBag) t.get(1)); + } + else + throw new IOException("Second argument in output must be a tuple or bag"); + } + /** set hadoop cassandra connection settings */ protected void setConnectionInformation() throws IOException { - super.setConnectionInformation(); + StorageHelper.setConnectionInformation(conf); + if (System.getenv(StorageHelper.PIG_INPUT_FORMAT) != null) + inputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_INPUT_FORMAT)); + else + inputFormatClass = DEFAULT_INPUT_FORMAT; + if (System.getenv(StorageHelper.PIG_OUTPUT_FORMAT) != null) + outputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_OUTPUT_FORMAT)); + else + outputFormatClass = DEFAULT_OUTPUT_FORMAT; if (System.getenv(PIG_ALLOW_DELETES) != null) allow_deletes = Boolean.parseBoolean(System.getenv(PIG_ALLOW_DELETES)); } + /** get the full class name */ + protected String getFullyQualifiedClassName(String classname) + { + return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname; + } + /** set read configuration settings */ public void setLocation(String location, Job job) throws IOException { @@ -296,11 +375,11 @@ public class CassandraStorage extends AbstractCassandraStorage widerows = Boolean.parseBoolean(System.getenv(PIG_WIDEROW_INPUT)); if (System.getenv(PIG_USE_SECONDARY) != null) usePartitionFilter = Boolean.parseBoolean(System.getenv(PIG_USE_SECONDARY)); - if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null) + if (System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE) != null) { try { - ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE))); + ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE))); } catch (NumberFormatException e) { @@ -380,12 +459,67 @@ public class CassandraStorage extends AbstractCassandraStorage initSchema(storeSignature); } + /** Methods to get the column family schema from Cassandra */ + protected void initSchema(String signature) throws IOException + { + Properties properties = UDFContext.getUDFContext().getUDFProperties(CassandraStorage.class); + + // Only get the schema if we haven't already gotten it + if (!properties.containsKey(signature)) + { + try + { + Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf); + client.set_keyspace(keyspace); + + if (username != null && password != null) + { + Map<String, String> credentials = new HashMap<String, String>(2); + credentials.put(PasswordAuthenticator.USERNAME_KEY, username); + credentials.put(PasswordAuthenticator.PASSWORD_KEY, password); + + try + { + client.login(new AuthenticationRequest(credentials)); + } + catch (AuthenticationException e) + { + logger.error("Authentication exception: invalid username and/or password"); + throw new IOException(e); + } + } + + // compose the CfDef for the columfamily + CfDef cfDef = getCfDef(client); + + if (cfDef != null) + { + StringBuilder sb = new StringBuilder(); + sb.append(cfdefToString(cfDef)); + properties.setProperty(signature, sb.toString()); + } + else + throw new IOException(String.format("Table '%s' not found in keyspace '%s'", + column_family, + keyspace)); + } + catch (Exception e) + { + throw new IOException(e); + } + } + } + + public void checkSchema(ResourceSchema schema) throws IOException + { + // we don't care about types, they all get casted to ByteBuffers + } + /** define the schema */ public ResourceSchema getSchema(String location, Job job) throws IOException { setLocation(location, job); - CfInfo cfInfo = getCfInfo(loadSignature); - CfDef cfDef = cfInfo.cfDef; + CfDef cfDef = getCfDef(loadSignature); if (cfDef.column_type.equals("Super")) return null; /* @@ -405,7 +539,7 @@ public class CassandraStorage extends AbstractCassandraStorage // add key ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema(); keyFieldSchema.setName("key"); - keyFieldSchema.setType(getPigType(marshallers.get(MarshallerType.KEY_VALIDATOR))); + keyFieldSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.KEY_VALIDATOR))); ResourceSchema bagSchema = new ResourceSchema(); ResourceFieldSchema bagField = new ResourceFieldSchema(); @@ -419,8 +553,8 @@ public class CassandraStorage extends AbstractCassandraStorage ResourceFieldSchema bagvalSchema = new ResourceFieldSchema(); bagcolSchema.setName("name"); bagvalSchema.setName("value"); - bagcolSchema.setType(getPigType(marshallers.get(MarshallerType.COMPARATOR))); - bagvalSchema.setType(getPigType(marshallers.get(MarshallerType.DEFAULT_VALIDATOR))); + bagcolSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.COMPARATOR))); + bagvalSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.DEFAULT_VALIDATOR))); bagTupleSchema.setFields(new ResourceFieldSchema[] { bagcolSchema, bagvalSchema }); bagTupleField.setSchema(bagTupleSchema); bagSchema.setFields(new ResourceFieldSchema[] { bagTupleField }); @@ -431,7 +565,7 @@ public class CassandraStorage extends AbstractCassandraStorage // add the key first, then the indexed columns, and finally the bag allSchemaFields.add(keyFieldSchema); - if (!widerows && (cfInfo.compactCqlTable || !cfInfo.cql3Table)) + if (!widerows) { // defined validators/indexes for (ColumnDef cdef : cfDef.column_metadata) @@ -445,14 +579,14 @@ public class CassandraStorage extends AbstractCassandraStorage ResourceFieldSchema idxColSchema = new ResourceFieldSchema(); idxColSchema.setName("name"); - idxColSchema.setType(getPigType(marshallers.get(MarshallerType.COMPARATOR))); + idxColSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.COMPARATOR))); ResourceFieldSchema valSchema = new ResourceFieldSchema(); AbstractType validator = validators.get(cdef.name); if (validator == null) validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR); valSchema.setName("value"); - valSchema.setType(getPigType(validator)); + valSchema.setType(StorageHelper.getPigType(validator)); innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema }); allSchemaFields.add(innerTupleField); @@ -472,7 +606,7 @@ public class CassandraStorage extends AbstractCassandraStorage AbstractType validator = validators.get(cdef.name); if (validator == null) validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR); - idxSchema.setType(getPigType(validator)); + idxSchema.setType(StorageHelper.getPigType(validator)); allSchemaFields.add(idxSchema); } } @@ -485,8 +619,8 @@ public class CassandraStorage extends AbstractCassandraStorage public void setPartitionFilter(Expression partitionFilter) throws IOException { UDFContext context = UDFContext.getUDFContext(); - Properties property = context.getUDFProperties(AbstractCassandraStorage.class); - property.setProperty(PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter))); + Properties property = context.getUDFProperties(CassandraStorage.class); + property.setProperty(StorageHelper.PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter))); } /** prepare writer */ @@ -495,33 +629,93 @@ public class CassandraStorage extends AbstractCassandraStorage this.writer = writer; } - /** write next row */ - public void putNext(Tuple t) throws IOException + /** convert object to ByteBuffer */ + protected ByteBuffer objToBB(Object o) { - /* - We support two cases for output: - First, the original output: - (key, (name, value), (name,value), {(name,value)}) (tuples or bag is optional) - For supers, we only accept the original output. - */ + if (o == null) + return nullToBB(); + if (o instanceof java.lang.String) + return ByteBuffer.wrap(new DataByteArray((String)o).get()); + if (o instanceof Integer) + return Int32Type.instance.decompose((Integer)o); + if (o instanceof Long) + return LongType.instance.decompose((Long)o); + if (o instanceof Float) + return FloatType.instance.decompose((Float)o); + if (o instanceof Double) + return DoubleType.instance.decompose((Double)o); + if (o instanceof UUID) + return ByteBuffer.wrap(UUIDGen.decompose((UUID) o)); + if(o instanceof Tuple) { + List<Object> objects = ((Tuple)o).getAll(); + //collections + if (objects.size() > 0 && objects.get(0) instanceof String) + { + String collectionType = (String) objects.get(0); + if ("set".equalsIgnoreCase(collectionType) || + "list".equalsIgnoreCase(collectionType)) + return objToListOrSetBB(objects.subList(1, objects.size())); + else if ("map".equalsIgnoreCase(collectionType)) + return objToMapBB(objects.subList(1, objects.size())); - if (t.size() < 1) + } + return objToCompositeBB(objects); + } + + return ByteBuffer.wrap(((DataByteArray) o).get()); + } + + private ByteBuffer objToListOrSetBB(List<Object> objects) + { + List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size()); + for(Object sub : objects) { - // simply nothing here, we can't even delete without a key - logger.warn("Empty output skipped, filter empty tuples to suppress this warning"); - return; + ByteBuffer buffer = objToBB(sub); + serialized.add(buffer); } - ByteBuffer key = objToBB(t.get(0)); - if (t.getType(1) == DataType.TUPLE) - writeColumnsFromTuple(key, t, 1); - else if (t.getType(1) == DataType.BAG) + // NOTE: using protocol v1 serialization format for collections so as to not break + // compatibility. Not sure if that's the right thing. + return CollectionSerializer.pack(serialized, objects.size(), 1); + } + + private ByteBuffer objToMapBB(List<Object> objects) + { + List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2); + for(Object sub : objects) { - if (t.size() > 2) - throw new IOException("No arguments allowed after bag"); - writeColumnsFromBag(key, (DataBag) t.get(1)); + List<Object> keyValue = ((Tuple)sub).getAll(); + for (Object entry: keyValue) + { + ByteBuffer buffer = objToBB(entry); + serialized.add(buffer); + } } - else - throw new IOException("Second argument in output must be a tuple or bag"); + // NOTE: using protocol v1 serialization format for collections so as to not break + // compatibility. Not sure if that's the right thing. + return CollectionSerializer.pack(serialized, objects.size(), 1); + } + + private ByteBuffer objToCompositeBB(List<Object> objects) + { + List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size()); + int totalLength = 0; + for(Object sub : objects) + { + ByteBuffer buffer = objToBB(sub); + serialized.add(buffer); + totalLength += 2 + buffer.remaining() + 1; + } + ByteBuffer out = ByteBuffer.allocate(totalLength); + for (ByteBuffer bb : serialized) + { + int length = bb.remaining(); + out.put((byte) ((length >> 8) & 0xFF)); + out.put((byte) (length & 0xFF)); + out.put(bb); + out.put((byte) 0); + } + out.flip(); + return out; } /** write tuple data to cassandra */ @@ -643,6 +837,19 @@ public class CassandraStorage extends AbstractCassandraStorage } } + /** get a list of columns with defined index*/ + protected List<ColumnDef> getIndexes() throws IOException + { + CfDef cfdef = getCfDef(loadSignature); + List<ColumnDef> indexes = new ArrayList<ColumnDef>(); + for (ColumnDef cdef : cfdef.column_metadata) + { + if (cdef.index_type != null) + indexes.add(cdef); + } + return indexes; + } + /** get a list of Cassandra IndexExpression from Pig expression */ private List<IndexExpression> filterToIndexExpressions(Expression expression) throws IOException { @@ -713,13 +920,64 @@ public class CassandraStorage extends AbstractCassandraStorage return indexClause.getExpressions(); } + public ResourceStatistics getStatistics(String location, Job job) + { + return null; + } + + public void cleanupOnFailure(String failure, Job job) + { + } + + public void cleanupOnSuccess(String location, Job job) throws IOException { + } + + + /** StoreFunc methods */ + public void setStoreFuncUDFContextSignature(String signature) + { + this.storeSignature = signature; + } + + public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException + { + return relativeToAbsolutePath(location, curDir); + } + + /** output format */ + public OutputFormat getOutputFormat() throws IOException + { + try + { + return FBUtilities.construct(outputFormatClass, "outputformat"); + } + catch (ConfigurationException e) + { + throw new IOException(e); + } + } + + + @Override + public InputFormat getInputFormat() throws IOException + { + try + { + return FBUtilities.construct(inputFormatClass, "inputformat"); + } + catch (ConfigurationException e) + { + throw new IOException(e); + } + } + /** get a list of index expression */ private List<IndexExpression> getIndexExpressions() throws IOException { UDFContext context = UDFContext.getUDFContext(); - Properties property = context.getUDFProperties(AbstractCassandraStorage.class); - if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null) - return indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE)); + Properties property = context.getUDFProperties(CassandraStorage.class); + if (property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE) != null) + return indexExpressionsFromString(property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE)); else return null; } @@ -731,6 +989,129 @@ public class CassandraStorage extends AbstractCassandraStorage return getColumnMeta(client, true, true); } + + /** get column meta data */ + protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage, boolean includeCompactValueColumn) + throws org.apache.cassandra.thrift.InvalidRequestException, + UnavailableException, + TimedOutException, + SchemaDisagreementException, + TException, + CharacterCodingException, + org.apache.cassandra.exceptions.InvalidRequestException, + ConfigurationException, + NotFoundException + { + String query = String.format("SELECT column_name, validator, index_type, type " + + "FROM %s.%s " + + "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'", + SystemKeyspace.NAME, + LegacySchemaTables.COLUMNS, + keyspace, + column_family); + + CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE); + + List<CqlRow> rows = result.rows; + List<ColumnDef> columnDefs = new ArrayList<ColumnDef>(); + if (rows == null || rows.isEmpty()) + { + // if CassandraStorage, just return the empty list + if (cassandraStorage) + return columnDefs; + + // otherwise for CqlNativeStorage, check metadata for classic thrift tables + CFMetaData cfm = getCFMetaData(keyspace, column_family, client); + for (ColumnDefinition def : cfm.regularAndStaticColumns()) + { + ColumnDef cDef = new ColumnDef(); + String columnName = def.name.toString(); + String type = def.type.toString(); + logger.debug("name: {}, type: {} ", columnName, type); + cDef.name = ByteBufferUtil.bytes(columnName); + cDef.validation_class = type; + columnDefs.add(cDef); + } + // we may not need to include the value column for compact tables as we + // could have already processed it as schema_columnfamilies.value_alias + if (columnDefs.size() == 0 && includeCompactValueColumn && cfm.compactValueColumn() != null) + { + ColumnDefinition def = cfm.compactValueColumn(); + if ("value".equals(def.name.toString())) + { + ColumnDef cDef = new ColumnDef(); + cDef.name = def.name.bytes; + cDef.validation_class = def.type.toString(); + columnDefs.add(cDef); + } + } + return columnDefs; + } + + Iterator<CqlRow> iterator = rows.iterator(); + while (iterator.hasNext()) + { + CqlRow row = iterator.next(); + ColumnDef cDef = new ColumnDef(); + String type = ByteBufferUtil.string(row.getColumns().get(3).value); + if (!type.equals("regular")) + continue; + cDef.setName(ByteBufferUtil.clone(row.getColumns().get(0).value)); + cDef.validation_class = ByteBufferUtil.string(row.getColumns().get(1).value); + ByteBuffer indexType = row.getColumns().get(2).value; + if (indexType != null) + cDef.index_type = getIndexType(ByteBufferUtil.string(indexType)); + columnDefs.add(cDef); + } + return columnDefs; + } + + + /** get CFMetaData of a column family */ + protected CFMetaData getCFMetaData(String ks, String cf, Cassandra.Client client) + throws NotFoundException, + org.apache.cassandra.thrift.InvalidRequestException, + TException, + org.apache.cassandra.exceptions.InvalidRequestException, + ConfigurationException + { + KsDef ksDef = client.describe_keyspace(ks); + for (CfDef cfDef : ksDef.cf_defs) + { + if (cfDef.name.equalsIgnoreCase(cf)) + return ThriftConversion.fromThrift(cfDef); + } + return null; + } + + /** get index type from string */ + protected IndexType getIndexType(String type) + { + type = type.toLowerCase(); + if ("keys".equals(type)) + return IndexType.KEYS; + else if("custom".equals(type)) + return IndexType.CUSTOM; + else if("composites".equals(type)) + return IndexType.COMPOSITES; + else + return null; + } + + /** return partition keys */ + public String[] getPartitionKeys(String location, Job job) throws IOException + { + if (!usePartitionFilter) + return null; + List<ColumnDef> indexes = getIndexes(); + String[] partitionKeys = new String[indexes.size()]; + for (int i = 0; i < indexes.size(); i++) + { + partitionKeys[i] = new String(indexes.get(i).getName()); + } + return partitionKeys; + } + /** convert key to a tuple */ private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException { @@ -744,15 +1125,26 @@ public class CassandraStorage extends AbstractCassandraStorage { if( comparator instanceof AbstractCompositeType ) { - setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key)); + StorageHelper.setTupleValue(tuple, 0, composeComposite((AbstractCompositeType) comparator, key)); } else { - setTupleValue(tuple, 0, cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR), key)); + StorageHelper.setTupleValue(tuple, 0, StorageHelper.cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR), key, nativeProtocolVersion)); } } + /** Deconstructs a composite type to a Tuple. */ + protected Tuple composeComposite(AbstractCompositeType comparator, ByteBuffer name) throws IOException + { + List<AbstractCompositeType.CompositeComponent> result = comparator.deconstruct(name); + Tuple t = TupleFactory.getInstance().newTuple(result.size()); + for (int i=0; i<result.size(); i++) + StorageHelper.setTupleValue(t, i, StorageHelper.cassandraToObj(result.get(i).comparator, result.get(i).value, nativeProtocolVersion)); + + return t; + } + /** cassandra://[username:password@]<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end> * [&reversed=true][&limit=1][&allow_deletes=true][&widerows=true] * [&use_secondary=true][&comparator=<comparator>][&partitioner=<partitioner>]]*/ @@ -817,10 +1209,206 @@ public class CassandraStorage extends AbstractCassandraStorage "[&init_address=<host>][&rpc_port=<port>]]': " + e.getMessage()); } } - + + + /** decompose the query to store the parameters in a map */ + public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException + { + String[] params = query.split("&"); + Map<String, String> map = new HashMap<String, String>(params.length); + for (String param : params) + { + String[] keyValue = param.split("="); + map.put(keyValue[0], URLDecoder.decode(keyValue[1], "UTF-8")); + } + return map; + } + public ByteBuffer nullToBB() { return null; } -} + /** return the CfInfo for the column family */ + protected CfDef getCfDef(Cassandra.Client client) + throws org.apache.cassandra.thrift.InvalidRequestException, + UnavailableException, + TimedOutException, + SchemaDisagreementException, + TException, + NotFoundException, + org.apache.cassandra.exceptions.InvalidRequestException, + ConfigurationException, + IOException + { + // get CF meta data + String query = String.format("SELECT type, comparator, subcomparator, default_validator, key_validator " + + "FROM %s.%s " + + "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'", + SystemKeyspace.NAME, + LegacySchemaTables.COLUMNFAMILIES, + keyspace, + column_family); + + CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE); + + if (result == null || result.rows == null || result.rows.isEmpty()) + return null; + + Iterator<CqlRow> iteraRow = result.rows.iterator(); + CfDef cfDef = new CfDef(); + cfDef.keyspace = keyspace; + cfDef.name = column_family; + if (iteraRow.hasNext()) + { + CqlRow cqlRow = iteraRow.next(); + + cfDef.column_type = ByteBufferUtil.string(cqlRow.columns.get(0).value); + cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(1).value); + ByteBuffer subComparator = cqlRow.columns.get(2).value; + if (subComparator != null) + cfDef.subcomparator_type = ByteBufferUtil.string(subComparator); + cfDef.default_validation_class = ByteBufferUtil.string(cqlRow.columns.get(3).value); + cfDef.key_validation_class = ByteBufferUtil.string(cqlRow.columns.get(4).value); + } + cfDef.column_metadata = getColumnMetadata(client); + return cfDef; + } + + /** get the columnfamily definition for the signature */ + protected CfDef getCfDef(String signature) throws IOException + { + UDFContext context = UDFContext.getUDFContext(); + Properties property = context.getUDFProperties(CassandraStorage.class); + String prop = property.getProperty(signature); + return cfdefFromString(prop); + } + + /** convert string back to CfDef */ + protected static CfDef cfdefFromString(String st) throws IOException + { + assert st != null; + TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory()); + CfDef cfDef = new CfDef(); + try + { + deserializer.deserialize(cfDef, Hex.hexToBytes(st)); + } + catch (TException e) + { + throw new IOException(e); + } + return cfDef; + } + + /** convert CfDef to string */ + protected static String cfdefToString(CfDef cfDef) throws IOException + { + assert cfDef != null; + // this is so awful it's kind of cool! + TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory()); + try + { + return Hex.bytesToHex(serializer.serialize(cfDef)); + } + catch (TException e) + { + throw new IOException(e); + } + } + + /** parse the string to a cassandra data type */ + protected AbstractType parseType(String type) throws IOException + { + try + { + // always treat counters like longs, specifically CCT.compose is not what we need + if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType")) + return LongType.instance; + return TypeParser.parse(type); + } + catch (ConfigurationException e) + { + throw new IOException(e); + } + catch (SyntaxException e) + { + throw new IOException(e); + } + } + + /** convert a column to a tuple */ + protected Tuple columnToTuple(Cell col, CfDef cfDef, AbstractType comparator) throws IOException + { + Tuple pair = TupleFactory.getInstance().newTuple(2); + + ByteBuffer colName = col.name().toByteBuffer(); + + // name + if(comparator instanceof AbstractCompositeType) + StorageHelper.setTupleValue(pair, 0, composeComposite((AbstractCompositeType) comparator, colName)); + else + StorageHelper.setTupleValue(pair, 0, StorageHelper.cassandraToObj(comparator, colName, nativeProtocolVersion)); + + // value + Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); + if (validators.get(colName) == null) + { + Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef); + StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value(), nativeProtocolVersion)); + } + else + StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(validators.get(colName), col.value(), nativeProtocolVersion)); + return pair; + } + + /** construct a map to store the mashaller type to cassandra data type mapping */ + protected Map<MarshallerType, AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException + { + Map<MarshallerType, AbstractType> marshallers = new EnumMap<MarshallerType, AbstractType>(MarshallerType.class); + AbstractType comparator; + AbstractType subcomparator; + AbstractType default_validator; + AbstractType key_validator; + + comparator = parseType(cfDef.getComparator_type()); + subcomparator = parseType(cfDef.getSubcomparator_type()); + default_validator = parseType(cfDef.getDefault_validation_class()); + key_validator = parseType(cfDef.getKey_validation_class()); + + marshallers.put(MarshallerType.COMPARATOR, comparator); + marshallers.put(MarshallerType.DEFAULT_VALIDATOR, default_validator); + marshallers.put(MarshallerType.KEY_VALIDATOR, key_validator); + marshallers.put(MarshallerType.SUBCOMPARATOR, subcomparator); + return marshallers; + } + + /** get the validators */ + protected Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException + { + Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>(); + for (ColumnDef cd : cfDef.getColumn_metadata()) + { + if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty()) + { + AbstractType validator = null; + try + { + validator = TypeParser.parse(cd.getValidation_class()); + if (validator instanceof CounterColumnType) + validator = LongType.instance; + validators.put(cd.name, validator); + } + catch (ConfigurationException e) + { + throw new IOException(e); + } + catch (SyntaxException e) + { + throw new IOException(e); + } + } + } + return validators; + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java index 7887085..91cdd02 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java @@ -17,48 +17,78 @@ */ package org.apache.cassandra.hadoop.pig; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; import java.util.*; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.exceptions.NoHostAvailableException; import org.apache.cassandra.db.BufferCell; import org.apache.cassandra.db.Cell; import org.apache.cassandra.db.composites.CellNames; import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.exceptions.AuthenticationException; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.hadoop.HadoopCompat; import org.apache.cassandra.hadoop.cql3.CqlConfigHelper; import org.apache.cassandra.hadoop.cql3.CqlRecordReader; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.serializers.CollectionSerializer; import org.apache.cassandra.utils.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.*; -import org.apache.pig.Expression; -import org.apache.pig.ResourceSchema; +import org.apache.pig.*; import org.apache.pig.Expression.OpType; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.*; import org.apache.pig.impl.util.UDFContext; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.external.biz.base64Coder.Base64Coder; -import com.datastax.driver.core.Row; -public class CqlNativeStorage extends AbstractCassandraStorage +public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata { + protected String DEFAULT_INPUT_FORMAT; + protected String DEFAULT_OUTPUT_FORMAT; + + protected String username; + protected String password; + protected String keyspace; + protected String column_family; + protected String loadSignature; + protected String storeSignature; + + protected Configuration conf; + protected String inputFormatClass; + protected String outputFormatClass; + protected int splitSize = 64 * 1024; + protected String partitionerClass; + protected boolean usePartitionFilter = false; + protected String initHostAddress; + protected String rpcPort; + protected int nativeProtocolVersion = 1; + private static final Logger logger = LoggerFactory.getLogger(CqlNativeStorage.class); private int pageSize = 1000; private String columns; private String outputQuery; private String whereClause; - private boolean hasCompactValueAlias = false; private RecordReader<Long, Row> reader; private RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer; @@ -119,21 +149,20 @@ public class CqlNativeStorage extends AbstractCassandraStorage if (!reader.nextKeyValue()) return null; - CfInfo cfInfo = getCfInfo(loadSignature); - CfDef cfDef = cfInfo.cfDef; + TableInfo tableMetadata = getCfInfo(loadSignature); Row row = reader.getCurrentValue(); - Tuple tuple = TupleFactory.getInstance().newTuple(cfDef.column_metadata.size()); - Iterator<ColumnDef> itera = cfDef.column_metadata.iterator(); + Tuple tuple = TupleFactory.getInstance().newTuple(tableMetadata.getColumns().size()); + Iterator<ColumnInfo> itera = tableMetadata.getColumns().iterator(); int i = 0; while (itera.hasNext()) { - ColumnDef cdef = itera.next(); - ByteBuffer columnValue = row.getBytesUnsafe(ByteBufferUtil.string(cdef.name.duplicate())); + ColumnInfo cdef = itera.next(); + ByteBuffer columnValue = row.getBytesUnsafe(cdef.getName()); if (columnValue != null) { - Cell cell = new BufferCell(CellNames.simpleDense(cdef.name), columnValue); - AbstractType<?> validator = getValidatorMap(cfDef).get(cdef.name); - setTupleValue(tuple, i, cqlColumnToObj(cell, cfDef), validator); + Cell cell = new BufferCell(CellNames.simpleDense(ByteBufferUtil.bytes(cdef.getName())), columnValue); + AbstractType<?> validator = getValidatorMap(tableMetadata).get(ByteBufferUtil.bytes(cdef.getName())); + setTupleValue(tuple, i, cqlColumnToObj(cell, tableMetadata), validator); } else tuple.set(i, null); @@ -148,15 +177,12 @@ public class CqlNativeStorage extends AbstractCassandraStorage } /** convert a cql column to an object */ - private Object cqlColumnToObj(Cell col, CfDef cfDef) throws IOException + private Object cqlColumnToObj(Cell col, TableInfo cfDef) throws IOException { // standard Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); ByteBuffer cellName = col.name().toByteBuffer(); - if (validators.get(cellName) == null) - return cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.DEFAULT_VALIDATOR), col.value()); - else - return cassandraToObj(validators.get(cellName), col.value()); + return StorageHelper.cassandraToObj(validators.get(cellName), col.value(), nativeProtocolVersion); } /** set the value to the position of the tuple */ @@ -165,7 +191,7 @@ public class CqlNativeStorage extends AbstractCassandraStorage if (validator instanceof CollectionType) setCollectionTupleValues(tuple, position, value, validator); else - setTupleValue(tuple, position, value); + StorageHelper.setTupleValue(tuple, position, value); } /** set the values of set/list at and after the position of the tuple */ @@ -220,173 +246,33 @@ public class CqlNativeStorage extends AbstractCassandraStorage return obj; } - /** include key columns */ - protected List<ColumnDef> getColumnMetadata(Cassandra.Client client) - throws InvalidRequestException, - UnavailableException, - TimedOutException, - SchemaDisagreementException, - TException, - CharacterCodingException, - org.apache.cassandra.exceptions.InvalidRequestException, - ConfigurationException, - NotFoundException - { - List<ColumnDef> keyColumns = null; - // get key columns + /** get the columnfamily definition for the signature */ + protected TableInfo getCfInfo(String signature) throws IOException + { + UDFContext context = UDFContext.getUDFContext(); + Properties property = context.getUDFProperties(CqlNativeStorage.class); + TableInfo cfInfo; try { - keyColumns = getKeysMeta(client); + cfInfo = cfdefFromString(property.getProperty(signature)); } - catch(Exception e) + catch (ClassNotFoundException e) { - logger.error("Error in retrieving key columns" , e); + throw new IOException(e); } - - // get other columns - List<ColumnDef> columns = getColumnMeta(client, false, !hasCompactValueAlias); - - // combine all columns in a list - if (keyColumns != null && columns != null) - keyColumns.addAll(columns); - - return keyColumns; + return cfInfo; } - /** get keys meta data */ - private List<ColumnDef> getKeysMeta(Cassandra.Client client) - throws Exception + /** return the CfInfo for the column family */ + protected TableMetadata getCfInfo(Session client) + throws NoHostAvailableException, + AuthenticationException, + IllegalStateException { - String query = "SELECT key_aliases, " + - " column_aliases, " + - " key_validator, " + - " comparator, " + - " keyspace_name, " + - " value_alias, " + - " default_validator " + - "FROM system.schema_columnfamilies " + - "WHERE keyspace_name = '%s'" + - " AND columnfamily_name = '%s' "; - - CqlResult result = client.execute_cql3_query( - ByteBufferUtil.bytes(String.format(query, keyspace, column_family)), - Compression.NONE, - ConsistencyLevel.ONE); - - if (result == null || result.rows == null || result.rows.isEmpty()) - return null; - - Iterator<CqlRow> iteraRow = result.rows.iterator(); - List<ColumnDef> keys = new ArrayList<ColumnDef>(); - if (iteraRow.hasNext()) - { - CqlRow cqlRow = iteraRow.next(); - String name = ByteBufferUtil.string(cqlRow.columns.get(4).value); - logger.debug("Found ksDef name: {}", name); - String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue())); - - logger.debug("partition keys: {}", keyString); - List<String> keyNames = FBUtilities.fromJsonList(keyString); - - Iterator<String> iterator = keyNames.iterator(); - while (iterator.hasNext()) - { - ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBufferUtil.bytes(iterator.next()); - keys.add(cDef); - } - // classic thrift tables - if (keys.size() == 0) - { - CFMetaData cfm = getCFMetaData(keyspace, column_family, client); - for (ColumnDefinition def : cfm.partitionKeyColumns()) - { - String key = def.name.toString(); - logger.debug("name: {} ", key); - ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBufferUtil.bytes(key); - keys.add(cDef); - } - for (ColumnDefinition def : cfm.clusteringColumns()) - { - String key = def.name.toString(); - logger.debug("name: {} ", key); - ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBufferUtil.bytes(key); - keys.add(cDef); - } - } - - keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue())); - - logger.debug("cluster keys: {}", keyString); - keyNames = FBUtilities.fromJsonList(keyString); - - iterator = keyNames.iterator(); - while (iterator.hasNext()) - { - ColumnDef cDef = new ColumnDef(); - cDef.name = ByteBufferUtil.bytes(iterator.next()); - keys.add(cDef); - } - - String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue())); - logger.debug("row key validator: {}", validator); - AbstractType<?> keyValidator = parseType(validator); - - Iterator<ColumnDef> keyItera = keys.iterator(); - if (keyValidator instanceof CompositeType) - { - Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator(); - while (typeItera.hasNext()) - keyItera.next().validation_class = typeItera.next().toString(); - } - else - keyItera.next().validation_class = keyValidator.toString(); - - validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue())); - logger.debug("cluster key validator: {}", validator); - - if (keyItera.hasNext() && validator != null && !validator.isEmpty()) - { - AbstractType<?> clusterKeyValidator = parseType(validator); - - if (clusterKeyValidator instanceof CompositeType) - { - Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator(); - while (keyItera.hasNext()) - keyItera.next().validation_class = typeItera.next().toString(); - } - else - keyItera.next().validation_class = clusterKeyValidator.toString(); - } - - // compact value_alias column - if (cqlRow.columns.get(5).value != null) - { - try - { - String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue())); - logger.debug("default validator: {}", compactValidator); - AbstractType<?> defaultValidator = parseType(compactValidator); - - ColumnDef cDef = new ColumnDef(); - cDef.name = cqlRow.columns.get(5).value; - cDef.validation_class = defaultValidator.toString(); - keys.add(cDef); - hasCompactValueAlias = true; - } - catch (Exception e) - { - // no compact column at value_alias - } - } - - } - return keys; + // get CF meta data + return client.getCluster().getMetadata().getKeyspace(Metadata.quote(keyspace)).getTable(Metadata.quote(column_family)); } - /** output: (((name, value), (name, value)), (value ... value), (value...value)) */ public void putNext(Tuple t) throws IOException { @@ -441,6 +327,91 @@ public class CqlNativeStorage extends AbstractCassandraStorage return keys; } + /** convert object to ByteBuffer */ + protected ByteBuffer objToBB(Object o) + { + if (o == null) + return nullToBB(); + if (o instanceof java.lang.String) + return ByteBuffer.wrap(new DataByteArray((String)o).get()); + if (o instanceof Integer) + return Int32Type.instance.decompose((Integer)o); + if (o instanceof Long) + return LongType.instance.decompose((Long)o); + if (o instanceof Float) + return FloatType.instance.decompose((Float)o); + if (o instanceof Double) + return DoubleType.instance.decompose((Double)o); + if (o instanceof UUID) + return ByteBuffer.wrap(UUIDGen.decompose((UUID) o)); + if(o instanceof Tuple) { + List<Object> objects = ((Tuple)o).getAll(); + //collections + if (objects.size() > 0 && objects.get(0) instanceof String) + { + String collectionType = (String) objects.get(0); + if ("set".equalsIgnoreCase(collectionType) || + "list".equalsIgnoreCase(collectionType)) + return objToListOrSetBB(objects.subList(1, objects.size())); + else if ("map".equalsIgnoreCase(collectionType)) + return objToMapBB(objects.subList(1, objects.size())); + + } + return objToCompositeBB(objects); + } + + return ByteBuffer.wrap(((DataByteArray) o).get()); + } + + private ByteBuffer objToListOrSetBB(List<Object> objects) + { + List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size()); + for(Object sub : objects) + { + ByteBuffer buffer = objToBB(sub); + serialized.add(buffer); + } + return CollectionSerializer.pack(serialized, objects.size(), 3); + } + + private ByteBuffer objToMapBB(List<Object> objects) + { + List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2); + for(Object sub : objects) + { + List<Object> keyValue = ((Tuple)sub).getAll(); + for (Object entry: keyValue) + { + ByteBuffer buffer = objToBB(entry); + serialized.add(buffer); + } + } + return CollectionSerializer.pack(serialized, objects.size(), 3); + } + + private ByteBuffer objToCompositeBB(List<Object> objects) + { + List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size()); + int totalLength = 0; + for(Object sub : objects) + { + ByteBuffer buffer = objToBB(sub); + serialized.add(buffer); + totalLength += 2 + buffer.remaining() + 1; + } + ByteBuffer out = ByteBuffer.allocate(totalLength); + for (ByteBuffer bb : serialized) + { + int length = bb.remaining(); + out.put((byte) ((length >> 8) & 0xFF)); + out.put((byte) (length & 0xFF)); + out.put(bb); + out.put((byte) 0); + } + out.flip(); + return out; + } + /** send CQL query request using data from tuple */ private void cqlQueryFromTuple(Map<String, ByteBuffer> key, Tuple t, int offset) throws IOException { @@ -487,30 +458,50 @@ public class CqlNativeStorage extends AbstractCassandraStorage } } + /** get the validators */ + protected Map<ByteBuffer, AbstractType> getValidatorMap(TableInfo cfDef) throws IOException + { + Map<ByteBuffer, AbstractType> validators = new HashMap<>(); + for (ColumnInfo cd : cfDef.getColumns()) + { + if (cd.getTypeName() != null) + { + try + { + AbstractType validator = TypeParser.parseCqlName(cd.getTypeName()); + if (validator instanceof CounterColumnType) + validator = LongType.instance; + validators.put(ByteBufferUtil.bytes(cd.getName()), validator); + } + catch (ConfigurationException | SyntaxException e) + { + throw new IOException(e); + } + } + } + return validators; + } + /** schema: (value, value, value) where keys are in the front. */ public ResourceSchema getSchema(String location, Job job) throws IOException { setLocation(location, job); - CfInfo cfInfo = getCfInfo(loadSignature); - CfDef cfDef = cfInfo.cfDef; + TableInfo cfInfo = getCfInfo(loadSignature); // top-level schema, no type ResourceSchema schema = new ResourceSchema(); - // get default marshallers and validators - Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef); - Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfDef); + // get default validators + Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfInfo); // will contain all fields for this schema List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>(); - for (ColumnDef cdef : cfDef.column_metadata) + for (ColumnInfo cdef : cfInfo.getColumns()) { ResourceFieldSchema valSchema = new ResourceFieldSchema(); - AbstractType validator = validators.get(cdef.name); - if (validator == null) - validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR); + AbstractType validator = validators.get(cdef.getName()); valSchema.setName(new String(cdef.getName())); - valSchema.setType(getPigType(validator)); + valSchema.setType(StorageHelper.getPigType(validator)); allSchemaFields.add(valSchema); } @@ -522,8 +513,8 @@ public class CqlNativeStorage extends AbstractCassandraStorage public void setPartitionFilter(Expression partitionFilter) throws IOException { UDFContext context = UDFContext.getUDFContext(); - Properties property = context.getUDFProperties(AbstractCassandraStorage.class); - property.setProperty(PARTITION_FILTER_SIGNATURE, partitionFilterToWhereClauseString(partitionFilter)); + Properties property = context.getUDFProperties(CqlNativeStorage.class); + property.setProperty(StorageHelper.PARTITION_FILTER_SIGNATURE, partitionFilterToWhereClauseString(partitionFilter)); } /** @@ -557,8 +548,8 @@ public class CqlNativeStorage extends AbstractCassandraStorage private String getWhereClauseForPartitionFilter() { UDFContext context = UDFContext.getUDFContext(); - Properties property = context.getUDFProperties(AbstractCassandraStorage.class); - return property.getProperty(PARTITION_FILTER_SIGNATURE); + Properties property = context.getUDFProperties(CqlNativeStorage.class); + return property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE); } /** set read configuration settings */ @@ -631,7 +622,7 @@ public class CqlNativeStorage extends AbstractCassandraStorage CqlConfigHelper.setInputWhereClauses(conf, whereClause); String whereClauseForPartitionFilter = getWhereClauseForPartitionFilter(); - String wc = whereClause != null && !whereClause.trim().isEmpty() + String wc = whereClause != null && !whereClause.trim().isEmpty() ? whereClauseForPartitionFilter == null ? whereClause: String.format("%s AND %s", whereClause.trim(), whereClauseForPartitionFilter) : whereClauseForPartitionFilter; @@ -639,17 +630,17 @@ public class CqlNativeStorage extends AbstractCassandraStorage { logger.debug("where clause: {}", wc); CqlConfigHelper.setInputWhereClauses(conf, wc); - } - if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null) + } + if (System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE) != null) { try { - ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE))); + ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE))); } catch (NumberFormatException e) { throw new IOException("PIG_INPUT_SPLIT_SIZE is not a number", e); - } + } } if (ConfigHelper.getInputInitialAddress(conf) == null) @@ -700,6 +691,74 @@ public class CqlNativeStorage extends AbstractCassandraStorage initSchema(storeSignature); } + /** Methods to get the column family schema from Cassandra */ + protected void initSchema(String signature) throws IOException + { + Properties properties = UDFContext.getUDFContext().getUDFProperties(CqlNativeStorage.class); + + // Only get the schema if we haven't already gotten it + if (!properties.containsKey(signature)) + { + try + { + Session client = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf).connect(); + client.execute("USE " + keyspace); + + // compose the CfDef for the columfamily + TableMetadata cfInfo = getCfInfo(client); + + if (cfInfo != null) + { + properties.setProperty(signature, cfdefToString(cfInfo)); + } + else + throw new IOException(String.format("Table '%s' not found in keyspace '%s'", + column_family, + keyspace)); + } + catch (Exception e) + { + throw new IOException(e); + } + } + } + + + /** convert CfDef to string */ + protected static String cfdefToString(TableMetadata cfDef) throws IOException + { + TableInfo tableInfo = new TableInfo(cfDef); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream( baos ); + oos.writeObject( tableInfo ); + oos.close(); + return new String( Base64Coder.encode(baos.toByteArray()) ); + } + + /** convert string back to CfDef */ + protected static TableInfo cfdefFromString(String st) throws IOException, ClassNotFoundException + { + byte [] data = Base64Coder.decode( st ); + ObjectInputStream ois = new ObjectInputStream( + new ByteArrayInputStream( data ) ); + Object o = ois.readObject(); + ois.close(); + return (TableInfo)o; + } + + /** decompose the query to store the parameters in a map */ + public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException + { + String[] params = query.split("&"); + Map<String, String> map = new HashMap<String, String>(params.length); + for (String param : params) + { + String[] keyValue = param.split("="); + map.put(keyValue[0], URLDecoder.decode(keyValue[1], "UTF-8")); + } + return map; + } + private void setLocationFromUri(String location) throws IOException { try @@ -808,11 +867,171 @@ public class CqlNativeStorage extends AbstractCassandraStorage } } - /** - * Thrift API can't handle null, so use empty byte array - */ public ByteBuffer nullToBB() { return ByteBuffer.wrap(new byte[0]); } + + /** output format */ + public OutputFormat getOutputFormat() throws IOException + { + try + { + return FBUtilities.construct(outputFormatClass, "outputformat"); + } + catch (ConfigurationException e) + { + throw new IOException(e); + } + } + + public void cleanupOnFailure(String failure, Job job) + { + } + + public void cleanupOnSuccess(String location, Job job) throws IOException { + } + + /** return partition keys */ + public String[] getPartitionKeys(String location, Job job) throws IOException + { + if (!usePartitionFilter) + return null; + TableInfo tableMetadata = getCfInfo(loadSignature); + String[] partitionKeys = new String[tableMetadata.getPartitionKey().size()]; + for (int i = 0; i < tableMetadata.getPartitionKey().size(); i++) + { + partitionKeys[i] = new String(tableMetadata.getPartitionKey().get(i).getName()); + } + return partitionKeys; + } + + public void checkSchema(ResourceSchema schema) throws IOException + { + // we don't care about types, they all get casted to ByteBuffers + } + + public ResourceStatistics getStatistics(String location, Job job) + { + return null; + } + + @Override + public InputFormat getInputFormat() throws IOException + { + try + { + return FBUtilities.construct(inputFormatClass, "inputformat"); + } + catch (ConfigurationException e) + { + throw new IOException(e); + } + } + + public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException + { + return relativeToAbsolutePath(location, curDir); + } + + @Override + public String relativeToAbsolutePath(String location, Path curDir) throws IOException + { + return location; + } + + @Override + public void setUDFContextSignature(String signature) + { + this.loadSignature = signature; + } + + /** StoreFunc methods */ + public void setStoreFuncUDFContextSignature(String signature) + { + this.storeSignature = signature; + } + + /** set hadoop cassandra connection settings */ + protected void setConnectionInformation() throws IOException + { + StorageHelper.setConnectionInformation(conf); + if (System.getenv(StorageHelper.PIG_INPUT_FORMAT) != null) + inputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_INPUT_FORMAT)); + else + inputFormatClass = DEFAULT_INPUT_FORMAT; + if (System.getenv(StorageHelper.PIG_OUTPUT_FORMAT) != null) + outputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_OUTPUT_FORMAT)); + else + outputFormatClass = DEFAULT_OUTPUT_FORMAT; + } + + /** get the full class name */ + protected String getFullyQualifiedClassName(String classname) + { + return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname; + } +} + +class TableInfo implements Serializable +{ + private final List<ColumnInfo> columns; + private final List<ColumnInfo> partitionKey; + private final String name; + + public TableInfo(TableMetadata tableMetadata) + { + List<ColumnMetadata> cmColumns = tableMetadata.getColumns(); + columns = new ArrayList<>(cmColumns.size()); + for (ColumnMetadata cm : cmColumns) + { + columns.add(new ColumnInfo(this, cm)); + } + List<ColumnMetadata> cmPartitionKey = tableMetadata.getPartitionKey(); + partitionKey = new ArrayList<>(cmPartitionKey.size()); + for (ColumnMetadata cm : cmPartitionKey) + { + partitionKey.add(new ColumnInfo(this, cm)); + } + name = tableMetadata.getName(); + } + + public List<ColumnInfo> getPartitionKey() + { + return partitionKey; + } + + public List<ColumnInfo> getColumns() + { + return columns; + } + + public String getName() + { + return name; + } } + +class ColumnInfo implements Serializable +{ + private final TableInfo table; + private final String name; + private final String typeName; + + public ColumnInfo(TableInfo tableInfo, ColumnMetadata columnMetadata) + { + table = tableInfo; + name = columnMetadata.getName(); + typeName = columnMetadata.getType().toString(); + } + + public String getName() + { + return name; + } + + public String getTypeName() + { + return typeName; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java b/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java new file mode 100644 index 0000000..66836b2 --- /dev/null +++ b/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java @@ -0,0 +1,121 @@ +package org.apache.cassandra.hadoop.pig; + +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.Date; +import java.util.UUID; + +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.serializers.CollectionSerializer; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; + +public class StorageHelper +{ + // system environment variables that can be set to configure connection info: + // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper + public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT"; + public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS"; + public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER"; + public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT"; + public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS"; + public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER"; + public final static String PIG_RPC_PORT = "PIG_RPC_PORT"; + public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS"; + public final static String PIG_PARTITIONER = "PIG_PARTITIONER"; + public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT"; + public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT"; + public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE"; + + + public final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter"; + + protected static void setConnectionInformation(Configuration conf) + { + if (System.getenv(PIG_RPC_PORT) != null) + { + ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT)); + ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT)); + } + + if (System.getenv(PIG_INPUT_RPC_PORT) != null) + ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT)); + if (System.getenv(PIG_OUTPUT_RPC_PORT) != null) + ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT)); + + if (System.getenv(PIG_INITIAL_ADDRESS) != null) + { + ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS)); + ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS)); + } + if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null) + ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS)); + if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null) + ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS)); + + if (System.getenv(PIG_PARTITIONER) != null) + { + ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER)); + ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER)); + } + if(System.getenv(PIG_INPUT_PARTITIONER) != null) + ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER)); + if(System.getenv(PIG_OUTPUT_PARTITIONER) != null) + ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER)); + } + + protected static Object cassandraToObj(AbstractType validator, ByteBuffer value, int nativeProtocolVersion) + { + if (validator instanceof DecimalType || validator instanceof InetAddressType) + return validator.getString(value); + + if (validator instanceof CollectionType) + { + // For CollectionType, the compose() method assumes the v3 protocol format of collection, which + // is not correct here since we query using the CQL-over-thrift interface which use the pre-v3 format + return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, nativeProtocolVersion); + } + + return validator.compose(value); + } + + /** set the value to the position of the tuple */ + protected static void setTupleValue(Tuple pair, int position, Object value) throws ExecException + { + if (value instanceof BigInteger) + pair.set(position, ((BigInteger) value).intValue()); + else if (value instanceof ByteBuffer) + pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value))); + else if (value instanceof UUID) + pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value))); + else if (value instanceof Date) + pair.set(position, TimestampType.instance.decompose((Date) value).getLong()); + else + pair.set(position, value); + } + + /** get pig type for the cassandra data type*/ + protected static byte getPigType(AbstractType type) + { + if (type instanceof LongType || type instanceof DateType || type instanceof TimestampType) // DateType is bad and it should feel bad + return DataType.LONG; + else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger + return DataType.INTEGER; + else if (type instanceof AsciiType || type instanceof UTF8Type || type instanceof DecimalType || type instanceof InetAddressType) + return DataType.CHARARRAY; + else if (type instanceof FloatType) + return DataType.FLOAT; + else if (type instanceof DoubleType) + return DataType.DOUBLE; + else if (type instanceof AbstractCompositeType || type instanceof CollectionType) + return DataType.TUPLE; + + return DataType.BYTEARRAY; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 06d83dd..6991958 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -92,7 +92,7 @@ public class SSTableLoader implements StreamEventHandler return false; } - CFMetaData metadata = client.getCFMetaData(keyspace, desc.cfname); + CFMetaData metadata = client.getTableMetadata(desc.cfname); if (metadata == null) { outputHandler.output(String.format("Skipping file %s: table %s.%s doesn't exist", name, keyspace, desc.cfname)); @@ -251,7 +251,9 @@ public class SSTableLoader implements StreamEventHandler /** * Stop the client. */ - public void stop() {} + public void stop() + { + } /** * Provides connection factory. @@ -268,7 +270,12 @@ public class SSTableLoader implements StreamEventHandler * Validate that {@code keyspace} is an existing keyspace and {@code * cfName} one of its existing column family. */ - public abstract CFMetaData getCFMetaData(String keyspace, String cfName); + public abstract CFMetaData getTableMetadata(String tableName); + + public void setTableMetadata(CFMetaData cfm) + { + throw new RuntimeException(); + } public Map<InetAddress, Collection<Range<Token>>> getEndpointToRangesMap() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index d6ce46e..c17d2d7 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -4117,8 +4117,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE SSTableLoader.Client client = new SSTableLoader.Client() { + private String keyspace; + public void init(String keyspace) { + this.keyspace = keyspace; try { setPartitioner(DatabaseDescriptor.getPartitioner()); @@ -4135,14 +4138,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } - public CFMetaData getCFMetaData(String keyspace, String cfName) + public CFMetaData getTableMetadata(String tableName) { - return Schema.instance.getCFMetaData(keyspace, cfName); + return Schema.instance.getCFMetaData(keyspace, tableName); } }; - SSTableLoader loader = new SSTableLoader(dir, client, new OutputHandler.LogOutput()); - return loader.stream(); + return new SSTableLoader(dir, client, new OutputHandler.LogOutput()).stream(); } public void rescheduleFailedDeletions()