http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java index 78080e2..3899f8c 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java @@ -22,11 +22,14 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; -import org.apache.cassandra.hadoop.AbstractBulkOutputFormat; import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.hadoop.HadoopCompat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.Progressable; @@ -35,7 +38,7 @@ import org.apache.hadoop.util.Progressable; * The <code>CqlBulkOutputFormat</code> acts as a Hadoop-specific * OutputFormat that allows reduce tasks to store keys (and corresponding * bound variable values) as CQL rows (and respective columns) in a given - * ColumnFamily. + * table. * * <p> * As is the case with the {@link org.apache.cassandra.hadoop.cql3.CqlOutputFormat}, @@ -48,13 +51,14 @@ import org.apache.hadoop.util.Progressable; * simple. * </p> */ -public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<ByteBuffer>> +public class CqlBulkOutputFormat extends OutputFormat<Object, List<ByteBuffer>> + implements org.apache.hadoop.mapred.OutputFormat<Object, List<ByteBuffer>> { - private static final String OUTPUT_CQL_SCHEMA_PREFIX = "cassandra.columnfamily.schema."; - private static final String OUTPUT_CQL_INSERT_PREFIX = "cassandra.columnfamily.insert."; + private static final String OUTPUT_CQL_SCHEMA_PREFIX = "cassandra.table.schema."; + private static final String OUTPUT_CQL_INSERT_PREFIX = "cassandra.table.insert."; private static final String DELETE_SOURCE = "cassandra.output.delete.source"; - private static final String COLUMNFAMILY_ALIAS_PREFIX = "cqlbulkoutputformat.columnfamily.alias."; + private static final String TABLE_ALIAS_PREFIX = "cqlbulkoutputformat.table.alias."; /** Fills the deprecated OutputFormat interface for streaming. */ @Deprecated @@ -75,33 +79,60 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<B { return new CqlBulkRecordWriter(context); } + + @Override + public void checkOutputSpecs(JobContext context) + { + checkOutputSpecs(HadoopCompat.getConfiguration(context)); + } + + private void checkOutputSpecs(Configuration conf) + { + if (ConfigHelper.getOutputKeyspace(conf) == null) + { + throw new UnsupportedOperationException("you must set the keyspace with setTable()"); + } + } + + /** Fills the deprecated OutputFormat interface for streaming. */ + @Deprecated + public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException + { + checkOutputSpecs(job); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException + { + return new NullOutputCommitter(); + } - public static void setColumnFamilySchema(Configuration conf, String columnFamily, String schema) + public static void setTableSchema(Configuration conf, String columnFamily, String schema) { conf.set(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily, schema); } - public static void setColumnFamilyInsertStatement(Configuration conf, String columnFamily, String insertStatement) + public static void setTableInsertStatement(Configuration conf, String columnFamily, String insertStatement) { conf.set(OUTPUT_CQL_INSERT_PREFIX + columnFamily, insertStatement); } - public static String getColumnFamilySchema(Configuration conf, String columnFamily) + public static String getTableSchema(Configuration conf, String columnFamily) { String schema = conf.get(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily); if (schema == null) { - throw new UnsupportedOperationException("You must set the ColumnFamily schema using setColumnFamilySchema."); + throw new UnsupportedOperationException("You must set the Table schema using setTableSchema."); } return schema; } - public static String getColumnFamilyInsertStatement(Configuration conf, String columnFamily) + public static String getTableInsertStatement(Configuration conf, String columnFamily) { String insert = conf.get(OUTPUT_CQL_INSERT_PREFIX + columnFamily); if (insert == null) { - throw new UnsupportedOperationException("You must set the ColumnFamily insert statement using setColumnFamilySchema."); + throw new UnsupportedOperationException("You must set the Table insert statement using setTableSchema."); } return insert; } @@ -116,13 +147,31 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<B return conf.getBoolean(DELETE_SOURCE, false); } - public static void setColumnFamilyAlias(Configuration conf, String alias, String columnFamily) + public static void setTableAlias(Configuration conf, String alias, String columnFamily) { - conf.set(COLUMNFAMILY_ALIAS_PREFIX + alias, columnFamily); + conf.set(TABLE_ALIAS_PREFIX + alias, columnFamily); } - public static String getColumnFamilyForAlias(Configuration conf, String alias) + public static String getTableForAlias(Configuration conf, String alias) { - return conf.get(COLUMNFAMILY_ALIAS_PREFIX + alias); + return conf.get(TABLE_ALIAS_PREFIX + alias); + } + + public static class NullOutputCommitter extends OutputCommitter + { + public void abortTask(TaskAttemptContext taskContext) { } + + public void cleanupJob(JobContext jobContext) { } + + public void commitTask(TaskAttemptContext taskContext) { } + + public boolean needsTaskCommit(TaskAttemptContext taskContext) + { + return false; + } + + public void setupJob(JobContext jobContext) { } + + public void setupTask(TaskAttemptContext taskContext) { } } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java index 60cd511..e77c4c8 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java @@ -17,17 +17,22 @@ */ package org.apache.cassandra.hadoop.cql3; +import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.hadoop.AbstractBulkRecordWriter; import org.apache.cassandra.hadoop.BulkRecordWriter; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.hadoop.HadoopCompat; @@ -35,11 +40,12 @@ import org.apache.cassandra.io.sstable.CQLSSTableWriter; import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.utils.NativeSSTableLoaderClient; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.Progressable; - /** * The <code>CqlBulkRecordWriter</code> maps the output <key, value> * pairs to a Cassandra column family. In particular, it applies the binded variables @@ -54,10 +60,26 @@ import org.apache.hadoop.util.Progressable; * * @see CqlBulkOutputFormat */ -public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<ByteBuffer>> +public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>> + implements org.apache.hadoop.mapred.RecordWriter<Object, List<ByteBuffer>> { + public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir"; + public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize"; + public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits"; + public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts"; + + private final Logger logger = LoggerFactory.getLogger(CqlBulkRecordWriter.class); + + protected final Configuration conf; + protected final int maxFailures; + protected final int bufferSize; + protected Closeable writer; + protected SSTableLoader loader; + protected Progressable progress; + protected TaskAttemptContext context; + private String keyspace; - private String columnFamily; + private String table; private String schema; private String insertStatement; private File outputDir; @@ -65,19 +87,25 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B CqlBulkRecordWriter(TaskAttemptContext context) throws IOException { - super(context); + this(HadoopCompat.getConfiguration(context)); + this.context = context; setConfigs(); } CqlBulkRecordWriter(Configuration conf, Progressable progress) throws IOException { - super(conf, progress); + this(conf); + this.progress = progress; setConfigs(); } CqlBulkRecordWriter(Configuration conf) throws IOException { - super(conf); + Config.setOutboundBindAny(true); + this.conf = conf; + DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0"))); + maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0")); + bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")); setConfigs(); } @@ -85,54 +113,55 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B { // if anything is missing, exceptions will be thrown here, instead of on write() keyspace = ConfigHelper.getOutputKeyspace(conf); - columnFamily = ConfigHelper.getOutputColumnFamily(conf); + table = ConfigHelper.getOutputColumnFamily(conf); - // check if columnFamily is aliased - String aliasedCf = CqlBulkOutputFormat.getColumnFamilyForAlias(conf, columnFamily); + // check if table is aliased + String aliasedCf = CqlBulkOutputFormat.getTableForAlias(conf, table); if (aliasedCf != null) - columnFamily = aliasedCf; + table = aliasedCf; - schema = CqlBulkOutputFormat.getColumnFamilySchema(conf, columnFamily); - insertStatement = CqlBulkOutputFormat.getColumnFamilyInsertStatement(conf, columnFamily); - outputDir = getColumnFamilyDirectory(); + schema = CqlBulkOutputFormat.getTableSchema(conf, table); + insertStatement = CqlBulkOutputFormat.getTableInsertStatement(conf, table); + outputDir = getTableDirectory(); deleteSrc = CqlBulkOutputFormat.getDeleteSourceOnSuccess(conf); } - + protected String getOutputLocation() throws IOException + { + String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir")); + if (dir == null) + throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION); + return dir; + } + private void prepareWriter() throws IOException { - try + if (writer == null) { - if (writer == null) - { - writer = CQLSSTableWriter.builder() - .forTable(schema) - .using(insertStatement) - .withPartitioner(ConfigHelper.getOutputPartitioner(conf)) - .inDirectory(outputDir) - .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"))) - .build(); - } - if (loader == null) - { - ExternalClient externalClient = new ExternalClient(conf); - - externalClient.addKnownCfs(keyspace, schema); - - this.loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler()) { - @Override - public void onSuccess(StreamState finalState) - { - if (deleteSrc) - FileUtils.deleteRecursive(outputDir); - } - }; - } + writer = CQLSSTableWriter.builder() + .forTable(schema) + .using(insertStatement) + .withPartitioner(ConfigHelper.getOutputPartitioner(conf)) + .inDirectory(outputDir) + .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"))) + .build(); } - catch (Exception e) + + if (loader == null) { - throw new IOException(e); - } + ExternalClient externalClient = new ExternalClient(conf); + externalClient.setTableMetadata(CFMetaData.compile(schema, keyspace)); + + loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler()) + { + @Override + public void onSuccess(StreamState finalState) + { + if (deleteSrc) + FileUtils.deleteRecursive(outputDir); + } + }; + } } /** @@ -168,9 +197,9 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B } } - private File getColumnFamilyDirectory() throws IOException + private File getTableDirectory() throws IOException { - File dir = new File(String.format("%s%s%s%s%s-%s", getOutputLocation(), File.separator, keyspace, File.separator, columnFamily, UUID.randomUUID().toString())); + File dir = new File(String.format("%s%s%s%s%s-%s", getOutputLocation(), File.separator, keyspace, File.separator, table, UUID.randomUUID().toString())); if (!dir.exists() && !dir.mkdirs()) { @@ -179,41 +208,83 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B return dir; } - - public static class ExternalClient extends AbstractBulkRecordWriter.ExternalClient + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { - private Map<String, Map<String, CFMetaData>> knownCqlCfs = new HashMap<>(); - - public ExternalClient(Configuration conf) - { - super(conf); - } + close(); + } + + /** Fills the deprecated RecordWriter interface for streaming. */ + @Deprecated + public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException + { + close(); + } - public void addKnownCfs(String keyspace, String cql) + private void close() throws IOException + { + if (writer != null) { - Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace); - - if (cfs == null) + writer.close(); + Future<StreamState> future = loader.stream(); + while (true) { - cfs = new HashMap<>(); - knownCqlCfs.put(keyspace, cfs); + try + { + future.get(1000, TimeUnit.MILLISECONDS); + break; + } + catch (ExecutionException | TimeoutException te) + { + if (null != progress) + progress.progress(); + if (null != context) + HadoopCompat.progress(context); + } + catch (InterruptedException e) + { + throw new IOException(e); + } + } + if (loader.getFailedHosts().size() > 0) + { + if (loader.getFailedHosts().size() > maxFailures) + throw new IOException("Too many hosts failed: " + loader.getFailedHosts()); + else + logger.warn("Some hosts failed: {}", loader.getFailedHosts()); } - - CFMetaData metadata = CFMetaData.compile(cql, keyspace); - cfs.put(metadata.cfName, metadata); } - - @Override - public CFMetaData getCFMetaData(String keyspace, String cfName) + } + + public static class ExternalClient extends NativeSSTableLoaderClient + { + public ExternalClient(Configuration conf) { - CFMetaData metadata = super.getCFMetaData(keyspace, cfName); - if (metadata != null) + super(resolveHostAddresses(conf), + CqlConfigHelper.getOutputNativePort(conf), + ConfigHelper.getOutputKeyspaceUserName(conf), + ConfigHelper.getOutputKeyspacePassword(conf), + CqlConfigHelper.getSSLOptions(conf).orNull()); + } + + private static Collection<InetAddress> resolveHostAddresses(Configuration conf) + { + Set<InetAddress> addresses = new HashSet<>(); + + for (String host : ConfigHelper.getOutputInitialAddress(conf).split(",")) { - return metadata; + try + { + addresses.add(InetAddress.getByName(host)); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } } - - Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace); - return cfs != null ? cfs.get(cfName) : null; + + return addresses; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java index ac5a7e5..3033fa6 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java @@ -34,22 +34,23 @@ import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; -import org.apache.cassandra.hadoop.ConfigHelper; -import org.apache.cassandra.io.util.FileUtils; +import com.google.common.base.Optional; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; import com.datastax.driver.core.AuthProvider; -import com.datastax.driver.core.PlainTextAuthProvider; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.PlainTextAuthProvider; +import com.datastax.driver.core.policies.LoadBalancingPolicy; import com.datastax.driver.core.PoolingOptions; import com.datastax.driver.core.ProtocolOptions; import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.SSLOptions; import com.datastax.driver.core.SocketOptions; -import com.datastax.driver.core.policies.LoadBalancingPolicy; -import com.google.common.base.Optional; +import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.hadoop.conf.Configuration; + public class CqlConfigHelper { @@ -84,6 +85,7 @@ public class CqlConfigHelper private static final String INPUT_NATIVE_PROTOCOL_VERSION = "cassandra.input.native.protocol.version"; private static final String OUTPUT_CQL = "cassandra.output.cql"; + private static final String OUTPUT_NATIVE_PORT = "cassandra.output.native.port"; /** * Set the CQL columns for the input of this job. @@ -176,6 +178,11 @@ public class CqlConfigHelper return Integer.parseInt(conf.get(INPUT_NATIVE_PORT, "9042")); } + public static int getOutputNativePort(Configuration conf) + { + return Integer.parseInt(conf.get(OUTPUT_NATIVE_PORT, "9042")); + } + public static Optional<Integer> getInputMinSimultReqPerConnections(Configuration conf) { return getIntSetting(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, conf); @@ -294,6 +301,22 @@ public class CqlConfigHelper public static Cluster getInputCluster(String[] hosts, Configuration conf) { int port = getInputNativePort(conf); + return getCluster(hosts, conf, port); + } + + public static Cluster getOutputCluster(String host, Configuration conf) + { + return getOutputCluster(new String[]{host}, conf); + } + + public static Cluster getOutputCluster(String[] hosts, Configuration conf) + { + int port = getOutputNativePort(conf); + return getCluster(hosts, conf, port); + } + + public static Cluster getCluster(String[] hosts, Configuration conf, int port) + { Optional<AuthProvider> authProvider = getAuthProvider(conf); Optional<SSLOptions> sslOptions = getSSLOptions(conf); Optional<Integer> protocolVersion = getProtocolVersion(conf); @@ -301,11 +324,11 @@ public class CqlConfigHelper SocketOptions socketOptions = getReadSocketOptions(conf); QueryOptions queryOptions = getReadQueryOptions(conf); PoolingOptions poolingOptions = getReadPoolingOptions(conf); - + Cluster.Builder builder = Cluster.builder() - .addContactPoints(hosts) - .withPort(port) - .withCompression(ProtocolOptions.Compression.NONE); + .addContactPoints(hosts) + .withPort(port) + .withCompression(ProtocolOptions.Compression.NONE); if (authProvider.isPresent()) builder.withAuthProvider(authProvider.get()); @@ -316,14 +339,13 @@ public class CqlConfigHelper builder.withProtocolVersion(protocolVersion.get()); } builder.withLoadBalancingPolicy(loadBalancingPolicy) - .withSocketOptions(socketOptions) - .withQueryOptions(queryOptions) - .withPoolingOptions(poolingOptions); + .withSocketOptions(socketOptions) + .withQueryOptions(queryOptions) + .withPoolingOptions(poolingOptions); return builder.build(); } - public static void setInputCoreConnections(Configuration conf, String connections) { conf.set(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, connections); @@ -502,7 +524,7 @@ public class CqlConfigHelper return Optional.of(getClientAuthProvider(authProvider.get(), conf)); } - private static Optional<SSLOptions> getSSLOptions(Configuration conf) + public static Optional<SSLOptions> getSSLOptions(Configuration conf) { Optional<String> truststorePath = getInputNativeSSLTruststorePath(conf); Optional<String> keystorePath = getInputNativeSSLKeystorePath(conf); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java index 0d09ca2..9a1cda6 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java @@ -23,15 +23,15 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; -import org.apache.cassandra.hadoop.AbstractColumnFamilyOutputFormat; -import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.hadoop.*; +import org.apache.hadoop.conf.*; import org.apache.hadoop.mapreduce.*; /** - * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific + * The <code>CqlOutputFormat</code> acts as a Hadoop-specific * OutputFormat that allows reduce tasks to store keys (and corresponding * bound variable values) as CQL rows (and respective columns) in a given - * ColumnFamily. + * table. * * <p> * As is the case with the {@link org.apache.cassandra.hadoop.ColumnFamilyInputFormat}, @@ -52,8 +52,51 @@ import org.apache.hadoop.mapreduce.*; * to Cassandra. * </p> */ -public class CqlOutputFormat extends AbstractColumnFamilyOutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>> -{ +public class CqlOutputFormat extends OutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>> + implements org.apache.hadoop.mapred.OutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>> +{ + /** + * Check for validity of the output-specification for the job. + * + * @param context + * information about the job + */ + public void checkOutputSpecs(JobContext context) + { + checkOutputSpecs(HadoopCompat.getConfiguration(context)); + } + + protected void checkOutputSpecs(Configuration conf) + { + if (ConfigHelper.getOutputKeyspace(conf) == null) + throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()"); + if (ConfigHelper.getOutputPartitioner(conf) == null) + throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster"); + if (ConfigHelper.getOutputInitialAddress(conf) == null) + throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node"); + } + + /** Fills the deprecated OutputFormat interface for streaming. */ + @Deprecated + public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException + { + checkOutputSpecs(job); + } + + /** + * The OutputCommitter for this format does not write any data to the DFS. + * + * @param context + * the task context + * @return an output committer + * @throws IOException + * @throws InterruptedException + */ + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException + { + return new NullOutputCommitter(); + } + /** Fills the deprecated OutputFormat interface for streaming. */ @Deprecated public CqlRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException @@ -73,4 +116,25 @@ public class CqlOutputFormat extends AbstractColumnFamilyOutputFormat<Map<String { return new CqlRecordWriter(context); } + + /** + * An {@link OutputCommitter} that does nothing. + */ + private static class NullOutputCommitter extends OutputCommitter + { + public void abortTask(TaskAttemptContext taskContext) { } + + public void cleanupJob(JobContext jobContext) { } + + public void commitTask(TaskAttemptContext taskContext) { } + + public boolean needsTaskCommit(TaskAttemptContext taskContext) + { + return false; + } + + public void setupJob(JobContext jobContext) { } + + public void setupTask(TaskAttemptContext taskContext) { } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java index 308bdf8..4a7bd59 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java @@ -37,13 +37,15 @@ import org.slf4j.LoggerFactory; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ColumnDefinitions; import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.Token; import com.datastax.driver.core.TupleValue; import com.datastax.driver.core.UDTValue; -import org.apache.cassandra.schema.LegacySchemaTables; -import org.apache.cassandra.db.SystemKeyspace; +import com.google.common.reflect.TypeToken; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.hadoop.ColumnFamilySplit; @@ -493,36 +495,72 @@ public class CqlRecordReader extends RecordReader<Long, Row> } @Override + public <T> List<T> getList(int i, TypeToken<T> typeToken) + { + return row.getList(i, typeToken); + } + + @Override public <T> List<T> getList(String name, Class<T> elementsClass) { return row.getList(name, elementsClass); } @Override + public <T> List<T> getList(String s, TypeToken<T> typeToken) + { + return row.getList(s, typeToken); + } + + @Override public <T> Set<T> getSet(int i, Class<T> elementsClass) { return row.getSet(i, elementsClass); } @Override + public <T> Set<T> getSet(int i, TypeToken<T> typeToken) + { + return row.getSet(i, typeToken); + } + + @Override public <T> Set<T> getSet(String name, Class<T> elementsClass) { return row.getSet(name, elementsClass); } @Override + public <T> Set<T> getSet(String s, TypeToken<T> typeToken) + { + return row.getSet(s, typeToken); + } + + @Override public <K, V> Map<K, V> getMap(int i, Class<K> keysClass, Class<V> valuesClass) { return row.getMap(i, keysClass, valuesClass); } @Override + public <K, V> Map<K, V> getMap(int i, TypeToken<K> typeToken, TypeToken<V> typeToken1) + { + return row.getMap(i, typeToken, typeToken1); + } + + @Override public <K, V> Map<K, V> getMap(String name, Class<K> keysClass, Class<V> valuesClass) { return row.getMap(name, keysClass, valuesClass); } @Override + public <K, V> Map<K, V> getMap(String s, TypeToken<K> typeToken, TypeToken<V> typeToken1) + { + return row.getMap(s, typeToken, typeToken1); + } + + @Override public UDTValue getUDTValue(int i) { return row.getUDTValue(i); @@ -545,6 +583,24 @@ public class CqlRecordReader extends RecordReader<Long, Row> { return row.getTupleValue(name); } + + @Override + public Token getToken(int i) + { + return row.getToken(i); + } + + @Override + public Token getToken(String name) + { + return row.getToken(name); + } + + @Override + public Token getPartitionKeyToken() + { + return row.getPartitionKeyToken(); + } } /** @@ -604,36 +660,21 @@ public class CqlRecordReader extends RecordReader<Long, Row> private void fetchKeys() { - String query = String.format("SELECT column_name, component_index, type " + - "FROM %s.%s " + - "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'", - SystemKeyspace.NAME, - LegacySchemaTables.COLUMNS, - keyspace, - cfName); - // get CF meta data - List<Row> rows = session.execute(query).all(); - if (rows.isEmpty()) + TableMetadata tableMetadata = session.getCluster() + .getMetadata() + .getKeyspace(Metadata.quote(keyspace)) + .getTable(Metadata.quote(cfName)); + if (tableMetadata == null) { throw new RuntimeException("No table metadata found for " + keyspace + "." + cfName); } - int numberOfPartitionKeys = 0; - for (Row row : rows) - if (row.getString(2).equals("partition_key")) - numberOfPartitionKeys++; - String[] partitionKeyArray = new String[numberOfPartitionKeys]; - for (Row row : rows) + //Here we assume that tableMetadata.getPartitionKey() always + //returns the list of columns in order of component_index + for (ColumnMetadata partitionKey : tableMetadata.getPartitionKey()) { - String type = row.getString(2); - String column = row.getString(0); - if (type.equals("partition_key")) - { - int componentIndex = row.isNull(1) ? 0 : row.getInt(1); - partitionKeyArray[componentIndex] = column; - } + partitionKeys.add(partitionKey.getName()); } - partitionKeys.addAll(Arrays.asList(partitionKeyArray)); } private String quote(String identifier) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java index dbbeb47..1d8436b 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java @@ -21,37 +21,39 @@ import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.schema.LegacySchemaTables; -import org.apache.cassandra.db.SystemKeyspace; + +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.TokenRange; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.marshal.TypeParser; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; +import org.apache.cassandra.dht.*; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; -import org.apache.cassandra.hadoop.AbstractColumnFamilyOutputFormat; -import org.apache.cassandra.hadoop.AbstractColumnFamilyRecordWriter; -import org.apache.cassandra.hadoop.ConfigHelper; -import org.apache.cassandra.hadoop.HadoopCompat; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.hadoop.*; +import org.apache.cassandra.utils.*; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.util.Progressable; -import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransport; /** - * The <code>ColumnFamilyRecordWriter</code> maps the output <key, value> - * pairs to a Cassandra column family. In particular, it applies the binded variables + * The <code>CqlRecordWriter</code> maps the output <key, value> + * pairs to a Cassandra table. In particular, it applies the binded variables * in the value to the prepared statement, which it associates with the key, and in * turn the responsible endpoint. * @@ -63,21 +65,38 @@ import org.apache.thrift.transport.TTransport; * * @see CqlOutputFormat */ -class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> implements AutoCloseable +class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> implements + org.apache.hadoop.mapred.RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>>, AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(CqlRecordWriter.class); + // The configuration this writer is associated with. + protected final Configuration conf; + // The number of mutations to buffer per endpoint + protected final int queueSize; + + protected final long batchThreshold; + + protected Progressable progressable; + protected TaskAttemptContext context; + + // The ring cache that describes the token ranges each node in the ring is + // responsible for. This is what allows us to group the mutations by + // the endpoints they should be targeted at. The targeted endpoint + // essentially + // acts as the primary replica for the rows being affected by the mutations. + private final NativeRingCache ringCache; + // handles for clients for each range running in the threadpool protected final Map<InetAddress, RangeClient> clients; // host to prepared statement id mappings - protected final ConcurrentHashMap<Cassandra.Client, Integer> preparedStatements = new ConcurrentHashMap<Cassandra.Client, Integer>(); + protected final ConcurrentHashMap<Session, PreparedStatement> preparedStatements = new ConcurrentHashMap<Session, PreparedStatement>(); protected final String cql; - protected AbstractType<?> keyValidator; - protected String [] partitionKeyColumns; - protected List<String> clusterColumns; + protected List<ColumnMetadata> partitionKeyColumns; + protected List<ColumnMetadata> clusterColumns; /** * Upon construction, obtain the map that this writer will use to collect @@ -100,28 +119,28 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB CqlRecordWriter(Configuration conf) { - super(conf); + this.conf = conf; + this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors()); + batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32); this.clients = new HashMap<>(); try { - Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf); + String keyspace = ConfigHelper.getOutputKeyspace(conf); + Session client = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace); + ringCache = new NativeRingCache(conf); if (client != null) { - client.set_keyspace(ConfigHelper.getOutputKeyspace(conf)); - String user = ConfigHelper.getOutputKeyspaceUserName(conf); - String password = ConfigHelper.getOutputKeyspacePassword(conf); - if ((user != null) && (password != null)) - AbstractColumnFamilyOutputFormat.login(user, password, client); - retrievePartitionKeyValidator(client); + TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf)); + clusterColumns = tableMetadata.getClusteringColumns(); + partitionKeyColumns = tableMetadata.getPartitionKey(); + String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim(); if (cqlQuery.toLowerCase().startsWith("insert")) throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement"); cql = appendKeyWhereClauses(cqlQuery); - TTransport transport = client.getOutputProtocol().getTransport(); - if (transport.isOpen()) - transport.close(); + client.close(); } else { @@ -133,7 +152,26 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB throw new RuntimeException(e); } } - + + /** + * Close this <code>RecordWriter</code> to future operations, but not before + * flushing out the batched mutations. + * + * @param context the context of the task + * @throws IOException + */ + public void close(TaskAttemptContext context) throws IOException, InterruptedException + { + close(); + } + + /** Fills the deprecated RecordWriter interface for streaming. */ + @Deprecated + public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException + { + close(); + } + @Override public void close() throws IOException { @@ -157,7 +195,7 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB /** * If the key is to be associated with a valid value, a mutation is created - * for it with the given column family and columns. In the event the value + * for it with the given table and columns. In the event the value * in the column is missing (i.e., null), then it is marked for * {@link Deletion}. Similarly, if the entire value for a key is missing * (i.e., null), then the entire key is marked for {@link Deletion}. @@ -172,25 +210,25 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB @Override public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer> values) throws IOException { - Range<Token> range = ringCache.getRange(getPartitionKey(keyColumns)); + TokenRange range = ringCache.getRange(getPartitionKey(keyColumns)); // get the client for the given range, or create a new one - final InetAddress address = ringCache.getEndpoint(range).get(0); + final InetAddress address = ringCache.getEndpoints(range).get(0); RangeClient client = clients.get(address); if (client == null) { // haven't seen keys for this range: create new client - client = new RangeClient(ringCache.getEndpoint(range)); + client = new RangeClient(ringCache.getEndpoints(range)); client.start(); clients.put(address, client); } // add primary key columns to the bind variables List<ByteBuffer> allValues = new ArrayList<ByteBuffer>(values); - for (String column : partitionKeyColumns) - allValues.add(keyColumns.get(column)); - for (String column : clusterColumns) - allValues.add(keyColumns.get(column)); + for (ColumnMetadata column : partitionKeyColumns) + allValues.add(keyColumns.get(column.getName())); + for (ColumnMetadata column : clusterColumns) + allValues.add(keyColumns.get(column.getName())); client.put(allValues); @@ -204,16 +242,50 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB * A client that runs in a threadpool and connects to the list of endpoints for a particular * range. Bound variables for keys in that range are sent to this client via a queue. */ - public class RangeClient extends AbstractRangeClient<List<ByteBuffer>> + public class RangeClient extends Thread { + // The list of endpoints for this range + protected final List<InetAddress> endpoints; + protected Session client; + // A bounded queue of incoming mutations for this range + protected final BlockingQueue<List<ByteBuffer>> queue = new ArrayBlockingQueue<List<ByteBuffer>>(queueSize); + + protected volatile boolean run = true; + // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing + // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls, + // when the client is closed. + protected volatile IOException lastException; + /** * Constructs an {@link RangeClient} for the given endpoints. * @param endpoints the possible endpoints to execute the mutations on */ public RangeClient(List<InetAddress> endpoints) { - super(endpoints); - } + super("client-" + endpoints); + this.endpoints = endpoints; + } + + /** + * enqueues the given value to Cassandra + */ + public void put(List<ByteBuffer> value) throws IOException + { + while (true) + { + if (lastException != null) + throw lastException; + try + { + if (queue.offer(value, 100, TimeUnit.MILLISECONDS)) + break; + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + } + } /** * Loops collecting cql binded variable values from the queue and sending to Cassandra @@ -234,156 +306,138 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB continue; } - Iterator<InetAddress> iter = endpoints.iterator(); + ListIterator<InetAddress> iter = endpoints.listIterator(); while (true) { // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly. + + // attempt to connect to a different endpoint try { - int i = 0; - int itemId = preparedStatement(client); - while (bindVariables != null) - { - client.execute_prepared_cql3_query(itemId, bindVariables, ConsistencyLevel.ONE); - i++; - - if (i >= batchThreshold) - break; - - bindVariables = queue.poll(); - } - - break; + InetAddress address = iter.next(); + String host = address.getHostName(); + client = CqlConfigHelper.getOutputCluster(host, conf).connect(); } catch (Exception e) { + //If connection died due to Interrupt, just try connecting to the endpoint again. + if (Thread.interrupted()) { + lastException = new IOException(e); + iter.previous(); + } closeInternal(); - if (!iter.hasNext()) + + // Most exceptions mean something unexpected went wrong to that endpoint, so + // we should try again to another. Other exceptions (auth or invalid request) are fatal. + if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext()) { lastException = new IOException(e); break outer; } } - // attempt to connect to a different endpoint try { - InetAddress address = iter.next(); - String host = address.getHostName(); - int port = ConfigHelper.getOutputRpcPort(conf); - client = CqlOutputFormat.createAuthenticatedClient(host, port, conf); + int i = 0; + PreparedStatement statement = preparedStatement(client); + while (bindVariables != null) + { + BoundStatement boundStatement = new BoundStatement(statement); + for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++) + { + boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition)); + } + client.execute(boundStatement); + i++; + + if (i >= batchThreshold) + break; + bindVariables = queue.poll(); + } + break; } catch (Exception e) { closeInternal(); - // TException means something unexpected went wrong to that endpoint, so - // we should try again to another. Other exceptions (auth or invalid request) are fatal. - if ((!(e instanceof TException)) || !iter.hasNext()) + if (!iter.hasNext()) { lastException = new IOException(e); break outer; } } + } } - // close all our connections once we are done. closeInternal(); } /** get prepared statement id from cache, otherwise prepare it from Cassandra server*/ - private int preparedStatement(Cassandra.Client client) + private PreparedStatement preparedStatement(Session client) { - Integer itemId = preparedStatements.get(client); - if (itemId == null) + PreparedStatement statement = preparedStatements.get(client); + if (statement == null) { - CqlPreparedResult result; + PreparedStatement result; try { - result = client.prepare_cql3_query(ByteBufferUtil.bytes(cql), Compression.NONE); + result = client.prepare(cql); } - catch (TException e) + catch (NoHostAvailableException e) { throw new RuntimeException("failed to prepare cql query " + cql, e); } - Integer previousId = preparedStatements.putIfAbsent(client, Integer.valueOf(result.itemId)); - itemId = previousId == null ? result.itemId : previousId; + PreparedStatement previousId = preparedStatements.putIfAbsent(client, result); + statement = previousId == null ? result : previousId; } - return itemId; + return statement; } - } - private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns) - { - ByteBuffer partitionKey; - if (keyValidator instanceof CompositeType) + public void close() throws IOException { - ByteBuffer[] keys = new ByteBuffer[partitionKeyColumns.length]; - for (int i = 0; i< keys.length; i++) - keys[i] = keyColumns.get(partitionKeyColumns[i]); + // stop the run loop. this will result in closeInternal being called by the time join() finishes. + run = false; + interrupt(); + try + { + this.join(); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } - partitionKey = CompositeType.build(keys); - } - else - { - partitionKey = keyColumns.get(partitionKeyColumns[0]); + if (lastException != null) + throw lastException; } - return partitionKey; - } - // FIXME - /** retrieve the key validator from system.schema_columnfamilies table */ - private void retrievePartitionKeyValidator(Cassandra.Client client) throws Exception - { - String keyspace = ConfigHelper.getOutputKeyspace(conf); - String cfName = ConfigHelper.getOutputColumnFamily(conf); - String query = String.format("SELECT key_validator, key_aliases, column_aliases " + - "FROM %s.%s " + - "WHERE keyspace_name = '%s' and columnfamily_name = '%s'", - SystemKeyspace.NAME, - LegacySchemaTables.COLUMNFAMILIES, - keyspace, - cfName); - CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE); - - Column rawKeyValidator = result.rows.get(0).columns.get(0); - String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue())); - keyValidator = parseType(validator); - - Column rawPartitionKeys = result.rows.get(0).columns.get(1); - String keyString = ByteBufferUtil.string(ByteBuffer.wrap(rawPartitionKeys.getValue())); - logger.debug("partition keys: {}", keyString); - - List<String> keys = FBUtilities.fromJsonList(keyString); - partitionKeyColumns = new String[keys.size()]; - int i = 0; - for (String key : keys) + + protected void closeInternal() { - partitionKeyColumns[i] = key; - i++; + if (client != null) + { + client.close();; + } } - - Column rawClusterColumns = result.rows.get(0).columns.get(2); - String clusterColumnString = ByteBufferUtil.string(ByteBuffer.wrap(rawClusterColumns.getValue())); - - logger.debug("cluster columns: {}", clusterColumnString); - clusterColumns = FBUtilities.fromJsonList(clusterColumnString); } - private AbstractType<?> parseType(String type) throws ConfigurationException + private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns) { - try + ByteBuffer partitionKey; + if (partitionKeyColumns.size() > 1) { - // always treat counters like longs, specifically CCT.serialize is not what we need - if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType")) - return LongType.instance; - return TypeParser.parse(type); + ByteBuffer[] keys = new ByteBuffer[partitionKeyColumns.size()]; + for (int i = 0; i< keys.length; i++) + keys[i] = keyColumns.get(partitionKeyColumns.get(i).getName()); + + partitionKey = CompositeType.build(keys); } - catch (SyntaxException e) + else { - throw new ConfigurationException(e.getMessage(), e); + partitionKey = keyColumns.get(partitionKeyColumns.get(0).getName()); } + return partitionKey; } /** @@ -393,10 +447,10 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB { String keyWhereClause = ""; - for (String partitionKey : partitionKeyColumns) - keyWhereClause += String.format("%s = ?", keyWhereClause.isEmpty() ? quote(partitionKey) : (" AND " + quote(partitionKey))); - for (String clusterColumn : clusterColumns) - keyWhereClause += " AND " + quote(clusterColumn) + " = ?"; + for (ColumnMetadata partitionKey : partitionKeyColumns) + keyWhereClause += String.format("%s = ?", keyWhereClause.isEmpty() ? quote(partitionKey.getName()) : (" AND " + quote(partitionKey.getName()))); + for (ColumnMetadata clusterColumn : clusterColumns) + keyWhereClause += " AND " + quote(clusterColumn.getName()) + " = ?"; return cqlQuery + " WHERE " + keyWhereClause; } @@ -406,4 +460,60 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB { return "\"" + identifier.replaceAll("\"", "\"\"") + "\""; } + + class NativeRingCache + { + private Map<TokenRange, Set<Host>> rangeMap; + private Metadata metadata; + private final IPartitioner partitioner; + private final Configuration conf; + + public NativeRingCache(Configuration conf) + { + this.conf = conf; + this.partitioner = ConfigHelper.getOutputPartitioner(conf); + refreshEndpointMap(); + } + + + private void refreshEndpointMap() + { + String keyspace = ConfigHelper.getOutputKeyspace(conf); + Session session = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace); + rangeMap = new HashMap<>(); + metadata = session.getCluster().getMetadata(); + Set<TokenRange> ranges = metadata.getTokenRanges(); + for (TokenRange range : ranges) + { + rangeMap.put(range, metadata.getReplicas(keyspace, range)); + } + } + + public TokenRange getRange(ByteBuffer key) + { + Token t = partitioner.getToken(key); + com.datastax.driver.core.Token driverToken = metadata.newToken(partitioner.getTokenFactory().toString(t)); + for (TokenRange range : rangeMap.keySet()) + { + if (range.contains(driverToken)) + { + return range; + } + } + + throw new RuntimeException("Invalid token information returned by describe_ring: " + rangeMap); + } + + public List<InetAddress> getEndpoints(TokenRange range) + { + Set<Host> hostSet = rangeMap.get(range); + List<Host> hosts = Arrays.asList(rangeMap.get(range).toArray(new Host[rangeMap.get(range).size()])); + List<InetAddress> addresses = new ArrayList<>(hosts.size()); + for (Host host: hosts) + { + addresses.add(host.getAddress()); + } + return addresses; + } + } }