Column insert via batch_mutate under some conditions sometimes fails to add the column --------------------------------------------------------------------------------------
Key: CASSANDRA-1183 URL: https://issues.apache.org/jira/browse/CASSANDRA-1183 Project: Cassandra Issue Type: Bug Components: Core Affects Versions: 0.6.1, 0.6.2 Environment: OS (via uname -a): Linux 2.6.27.5-117.fc10.x86_64 #1 SMP Tue Nov 18 11:58:53 EST 2008 x86_64 x86_64 x86_64 GNU/Linux Java build 1.6.0_20-b02 Cassandra 0.6.2 & 0.6.1 hector-0.6.0-14.jar Reporter: Scott McCarty Under heavy load batch_mutate sometimes fails to add a column but doesn't report an error to the client. Details: * client is running a tight loop that does: write to randomly named column; do a get_slice that should return that newly created column; delete the column if it was returned in get_slice * the mutation map for batch_insert has exactly one mutation in it: insert the column * if the column insert is done with the insert() API it doesn't ever fail (for as long as I've run it) * the column really isn't inserted as doing a 'get' on the entire rowkey in the command line interface doesn't show it, so it's not a timing issue * read consistency level is ONE, write consistency level is ALL * it fails on a single node cluster and on a 4 node cluster (replication factor = 3) I am including a Java cmdline program (below) that demonstrates the problem; it uses the 'hector' java client (I hope that's okay--I verified that no errors or warning were being masked by hector). I didn't see a place to upload a file in bug reports so this program is in line. To show the failure using the program, just run it with the command line: bash> java BatchMutateBug localhost:9160 true keyspace columnfamily rowkey substituting in the appropriate Cassandra server connection info, a valid keyspace name, and a valid column family name. The second parameter ("true" in the example) tells the program to use batch_mutate(). If you pass in "false" it will use insert() and the program won't fail. I've tried the above on two different setups and each one fails within 10-15 seconds generally. It seems to fail quickest if you use a non-existing rowkey. Here's the program: import java.util.*; import org.apache.cassandra.thrift.*; import me.prettyprint.cassandra.service.*; public class BatchMutateBug { private static String[] cHosts; private static String cKeyspace; public static void main( String[] args ) { BatchMutateBug c = new BatchMutateBug(); if (args.length != 5) System.out.println("Usage: BatchMutateBug host:port useBatchFlag keyspace columnfamily rowkey"); else c.contend(args[0], args[1], args[2], args[3], args[4]); } private void contend( String hostPort, final String batchFlag, String keyspace, final String cfname, final String rowkey ) { final boolean useBatch = Boolean.valueOf(batchFlag); Thread wr1; cHosts = new String[] {hostPort}; cKeyspace = keyspace; if (useBatch) System.out.println("Using batchMutate: you should see an ERROR eventually"); else System.out.println("NOT using batchMutate: you should NOT see an ERROR"); wr1 = new Thread() { public void run() { try { writeRead(useBatch, cfname, rowkey); } catch (Exception x) { x.printStackTrace(); } } }; wr1.start(); } /** * */ private void writeRead( boolean useBatch, String cfname, String rowkey ) throws Exception { Random rand = new Random(); ColumnParent par = new ColumnParent(cfname); ColumnPath cp = new ColumnPath(cfname); Keyspace ks; String colname; String rpart; while (true) { boolean pass = false; ks = getWriteKeyspace(); rpart = Integer.toString(rand.nextInt()); colname = "%reader:" + rpart; try { if (useBatch) { Map<String, Map<String, List<Mutation>>> muts = new HashMap<String, Map<String, List<Mutation>>>(); Map<String, List<Mutation>> cfm = new HashMap<String, List<Mutation>>(); List<Mutation> mutl = new ArrayList<Mutation>(); Mutation mut = new Mutation(); mutl.add(mut); cfm.put(cfname, mutl); muts.put(rowkey, cfm); mut.column_or_supercolumn = new ColumnOrSuperColumn(); mut.column_or_supercolumn.column = new Column(); mut.column_or_supercolumn.column.name = colname.getBytes(); mut.column_or_supercolumn.column.value = colname.getBytes(); mut.column_or_supercolumn.column.timestamp = System.currentTimeMillis() * 1000L; ks.batchMutate(muts); } else { cp.column = colname.getBytes(); ks.insert(rowkey, cp, colname.getBytes()); } } finally { releaseKeyspace(ks); } ks = getReadKeyspace(); try { List<Column> cols = readSlice(rowkey, cfname, "%", "&"); if (cols.size() == 0) { System.out.println("ERROR: column '" + colname + "' was inserted but getSlice returned 0 columns"); } else { boolean found = false; for (Column c : cols) { String fetched = new String(c.name); if (colname.equals(fetched)) { found = true; pass = true; break; } } if (!found) System.out.println("ERROR: column '" + colname + "' was inserted but getSlice did NOT return it"); } } finally { releaseKeyspace(ks); } ks = getWriteKeyspace(); if (pass) { try { ks.remove(rowkey, cp); } finally { releaseKeyspace(ks); } } } } private List<Column> readSlice( String rowkey, String cfname, String start, String finish ) throws Exception { Keyspace ks = getReadKeyspace(); List<Column> cols = null; ColumnParent par = new ColumnParent(cfname); try { SlicePredicate sp; sp = new SlicePredicate(); sp.slice_range = new SliceRange(); sp.slice_range.count = 100; sp.slice_range.start = start.getBytes(); sp.slice_range.finish = finish.getBytes(); cols = ks.getSlice(rowkey, par, sp); } catch (NotFoundException nfe) { System.out.println("ERROR: got NotFoundException"); } finally { releaseKeyspace(ks); } return cols; } private Keyspace getReadKeyspace() { try { CassandraClientPool ccp = CassandraClientPoolFactory.INSTANCE.get(); CassandraClient c = ccp.borrowClient(cHosts); me.prettyprint.cassandra.service.Keyspace ks; ks = c.getKeyspace(cKeyspace, ConsistencyLevel.ONE); return ks; } catch (Exception x) { x.printStackTrace(); } return null; } private Keyspace getWriteKeyspace() { try { CassandraClientPool ccp = CassandraClientPoolFactory.INSTANCE.get(); CassandraClient c = ccp.borrowClient(cHosts); me.prettyprint.cassandra.service.Keyspace ks; ks = c.getKeyspace(cKeyspace, ConsistencyLevel.ALL); return ks; } catch (Exception x) { x.printStackTrace(); } return null; } protected static void releaseKeyspace( Keyspace ks ) { if (ks != null) { try { CassandraClientPool ccp = CassandraClientPoolFactory.INSTANCE.get(); ccp.releaseClient(ks.getClient()); } catch (Exception x) { x.printStackTrace(); } } } } -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.