[ https://issues.apache.org/jira/browse/CASSANDRA-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jonathan Ellis reassigned CASSANDRA-1183: ----------------------------------------- Assignee: Brandon Williams > Column insert via batch_mutate under some conditions sometimes fails to add > the column > -------------------------------------------------------------------------------------- > > Key: CASSANDRA-1183 > URL: https://issues.apache.org/jira/browse/CASSANDRA-1183 > Project: Cassandra > Issue Type: Bug > Components: Core > Affects Versions: 0.6.1, 0.6.2 > Environment: OS (via uname -a): Linux 2.6.27.5-117.fc10.x86_64 #1 > SMP Tue Nov 18 11:58:53 EST 2008 x86_64 x86_64 x86_64 GNU/Linux > Java build 1.6.0_20-b02 > Cassandra 0.6.2 & 0.6.1 > hector-0.6.0-14.jar > Reporter: Scott McCarty > Assignee: Brandon Williams > > Under heavy load batch_mutate sometimes fails to add a column but doesn't > report an error to the client. Details: > * client is running a tight loop that does: write to randomly named column; > do a get_slice that should return that newly created column; delete the > column if it was returned in get_slice > * the mutation map for batch_insert has exactly one mutation in it: insert > the column > * if the column insert is done with the insert() API it doesn't ever fail > (for as long as I've run it) > * the column really isn't inserted as doing a 'get' on the entire rowkey in > the command line interface doesn't show it, so it's not a timing issue > * read consistency level is ONE, write consistency level is ALL > * it fails on a single node cluster and on a 4 node cluster (replication > factor = 3) > I am including a Java cmdline program (below) that demonstrates the problem; > it uses the 'hector' java client (I hope that's okay--I verified that no > errors or warning were being masked by hector). I didn't see a place to > upload a file in bug reports so this program is in line. > To show the failure using the program, just run it with the command line: > bash> java BatchMutateBug localhost:9160 true keyspace columnfamily rowkey > substituting in the appropriate Cassandra server connection info, a valid > keyspace name, and a valid column family name. The second parameter ("true" > in the example) tells the program to use batch_mutate(). If you pass in > "false" it will use insert() and the program won't fail. > I've tried the above on two different setups and each one fails within 10-15 > seconds generally. It seems to fail quickest if you use a non-existing > rowkey. > Here's the program: > import java.util.*; > import org.apache.cassandra.thrift.*; > import me.prettyprint.cassandra.service.*; > public class BatchMutateBug > { > private static String[] cHosts; > private static String cKeyspace; > public static void main( > String[] args > ) > { > BatchMutateBug c = new BatchMutateBug(); > if (args.length != 5) > System.out.println("Usage: BatchMutateBug host:port useBatchFlag > keyspace columnfamily rowkey"); > else > c.contend(args[0], args[1], args[2], args[3], args[4]); > } > private void contend( > String hostPort, > final String batchFlag, > String keyspace, > final String cfname, > final String rowkey > ) > { > final boolean useBatch = Boolean.valueOf(batchFlag); > Thread wr1; > cHosts = new String[] {hostPort}; > cKeyspace = keyspace; > if (useBatch) > System.out.println("Using batchMutate: you should see an ERROR > eventually"); > else > System.out.println("NOT using batchMutate: you should NOT see an > ERROR"); > wr1 = new Thread() > { > public void run() > { > try > { > writeRead(useBatch, cfname, rowkey); > } > catch (Exception x) > { > x.printStackTrace(); > } > } > }; > wr1.start(); > } > /** > * > */ > private void writeRead( > boolean useBatch, > String cfname, > String rowkey > ) > throws Exception > { > Random rand = new Random(); > ColumnParent par = new ColumnParent(cfname); > ColumnPath cp = new ColumnPath(cfname); > Keyspace ks; > String colname; > String rpart; > while (true) > { > boolean pass = false; > ks = getWriteKeyspace(); > rpart = Integer.toString(rand.nextInt()); > colname = "%reader:" + rpart; > try > { > if (useBatch) > { > Map<String, Map<String, List<Mutation>>> muts = new > HashMap<String, Map<String, List<Mutation>>>(); > Map<String, List<Mutation>> cfm = new > HashMap<String, List<Mutation>>(); > List<Mutation> mutl = new > ArrayList<Mutation>(); > Mutation mut = new > Mutation(); > mutl.add(mut); > cfm.put(cfname, mutl); > muts.put(rowkey, cfm); > mut.column_or_supercolumn = new ColumnOrSuperColumn(); > mut.column_or_supercolumn.column = new Column(); > mut.column_or_supercolumn.column.name = > colname.getBytes(); > mut.column_or_supercolumn.column.value = > colname.getBytes(); > mut.column_or_supercolumn.column.timestamp = > System.currentTimeMillis() * 1000L; > ks.batchMutate(muts); > } > else > { > cp.column = colname.getBytes(); > ks.insert(rowkey, cp, colname.getBytes()); > } > } > finally > { > releaseKeyspace(ks); > } > ks = getReadKeyspace(); > try > { > List<Column> cols = readSlice(rowkey, cfname, "%", "&"); > if (cols.size() == 0) > { > System.out.println("ERROR: column '" + colname + "' was > inserted but getSlice returned 0 columns"); > } > else > { > boolean found = false; > for (Column c : cols) > { > String fetched = new String(c.name); > if (colname.equals(fetched)) > { > found = true; > pass = true; > break; > } > } > if (!found) > System.out.println("ERROR: column '" + colname + "' > was inserted but getSlice did NOT return it"); > } > } > finally > { > releaseKeyspace(ks); > } > ks = getWriteKeyspace(); > if (pass) > { > try > { > ks.remove(rowkey, cp); > } > finally > { > releaseKeyspace(ks); > } > } > } > } > private List<Column> readSlice( > String rowkey, > String cfname, > String start, > String finish > ) > throws Exception > { > Keyspace ks = getReadKeyspace(); > List<Column> cols = null; > ColumnParent par = new ColumnParent(cfname); > try > { > SlicePredicate sp; > sp = new SlicePredicate(); > sp.slice_range = new SliceRange(); > sp.slice_range.count = 100; > sp.slice_range.start = start.getBytes(); > sp.slice_range.finish = finish.getBytes(); > cols = ks.getSlice(rowkey, par, sp); > } > catch (NotFoundException nfe) > { > System.out.println("ERROR: got NotFoundException"); > } > finally > { > releaseKeyspace(ks); > } > return cols; > } > private Keyspace getReadKeyspace() > { > try > { > CassandraClientPool ccp = > CassandraClientPoolFactory.INSTANCE.get(); > CassandraClient c = ccp.borrowClient(cHosts); > me.prettyprint.cassandra.service.Keyspace ks; > ks = c.getKeyspace(cKeyspace, ConsistencyLevel.ONE); > return ks; > } > catch (Exception x) > { > x.printStackTrace(); > } > return null; > } > private Keyspace getWriteKeyspace() > { > try > { > CassandraClientPool ccp = > CassandraClientPoolFactory.INSTANCE.get(); > CassandraClient c = ccp.borrowClient(cHosts); > me.prettyprint.cassandra.service.Keyspace ks; > ks = c.getKeyspace(cKeyspace, ConsistencyLevel.ALL); > return ks; > } > catch (Exception x) > { > x.printStackTrace(); > } > return null; > } > protected static void releaseKeyspace( > Keyspace ks > ) > { > if (ks != null) > { > try > { > CassandraClientPool ccp = > CassandraClientPoolFactory.INSTANCE.get(); > ccp.releaseClient(ks.getClient()); > } > catch (Exception x) > { > x.printStackTrace(); > } > } > } > } -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.