Column insert via batch_mutate under some conditions sometimes fails to add the 
column
--------------------------------------------------------------------------------------

                 Key: CASSANDRA-1183
                 URL: https://issues.apache.org/jira/browse/CASSANDRA-1183
             Project: Cassandra
          Issue Type: Bug
          Components: Core
    Affects Versions: 0.6.1, 0.6.2
         Environment: OS (via uname -a):  Linux 2.6.27.5-117.fc10.x86_64 #1 SMP 
Tue Nov 18 11:58:53 EST 2008 x86_64 x86_64 x86_64 GNU/Linux
Java build 1.6.0_20-b02
Cassandra 0.6.2 & 0.6.1
hector-0.6.0-14.jar

            Reporter: Scott McCarty


Under heavy load batch_mutate sometimes fails to add a column but doesn't 
report an error to the client.  Details:

* client is running a tight loop that does:  write to randomly named column; do 
a get_slice that should return that newly created column; delete the column if 
it was returned in get_slice
* the mutation map for batch_insert has exactly one mutation in it:  insert the 
column
* if the column insert is done with the insert() API it doesn't ever fail (for 
as long as I've run it)
* the column really isn't inserted as doing a 'get' on the entire rowkey in the 
command line interface doesn't show it, so it's not a timing issue
* read consistency level is ONE, write consistency level is ALL
* it fails on a single node cluster and on a 4 node cluster (replication factor 
= 3)

I am including a Java cmdline program (below) that demonstrates the problem; it 
uses the 'hector' java client (I hope that's okay--I verified that no errors or 
warning were being masked by hector).  I didn't see a place to upload a file in 
bug reports so this program is in line.

To show the failure using the program, just run it with the command line:

  bash>   java BatchMutateBug localhost:9160 true keyspace columnfamily rowkey

substituting in the appropriate Cassandra server connection info, a valid 
keyspace name, and a valid column family name.  The second parameter ("true" in 
the example) tells the program to use batch_mutate().  If you pass in "false" 
it will use insert() and the program won't fail.

I've tried the above on two different setups and each one fails within 10-15 
seconds generally.  It seems to fail quickest if you use a non-existing rowkey.

Here's the program:


import java.util.*;
import org.apache.cassandra.thrift.*;
import me.prettyprint.cassandra.service.*;

public class BatchMutateBug
{
    private static String[]  cHosts;
    private static String    cKeyspace;

    public static void main(
        String[] args
        )
    {
        BatchMutateBug c = new BatchMutateBug();

        if (args.length != 5)
            System.out.println("Usage:  BatchMutateBug host:port useBatchFlag 
keyspace columnfamily rowkey");
        else
            c.contend(args[0], args[1], args[2], args[3], args[4]);
    }

    private void contend(
        String        hostPort,
        final String  batchFlag,
        String        keyspace,
        final String  cfname,
        final String  rowkey
        )
    {
        final  boolean  useBatch = Boolean.valueOf(batchFlag);
        Thread          wr1;

        cHosts = new String[] {hostPort};
        cKeyspace = keyspace;

        if (useBatch)
            System.out.println("Using batchMutate:  you should see an ERROR 
eventually");
        else
            System.out.println("NOT using batchMutate:  you should NOT see an 
ERROR");

        wr1 = new Thread()
            {
                public void run()
                {
                    try
                    {
                        writeRead(useBatch, cfname, rowkey);
                    }
                    catch (Exception x)
                    {
                        x.printStackTrace();
                    }
                }
            };

        wr1.start();
    }

    /**
     *
     */
    private void writeRead(
        boolean useBatch,
        String  cfname,
        String  rowkey
        )
        throws Exception
    {
        Random       rand   = new Random();
        ColumnParent par    = new ColumnParent(cfname);
        ColumnPath   cp     = new ColumnPath(cfname);
        Keyspace     ks;
        String       colname;
        String       rpart;

        while (true)
        {
            boolean  pass = false;

            ks      = getWriteKeyspace();
            rpart   = Integer.toString(rand.nextInt());
            colname = "%reader:" + rpart;

            try
            {
                if (useBatch)
                {
                    Map<String, Map<String, List<Mutation>>> muts = new 
HashMap<String, Map<String, List<Mutation>>>();
                    Map<String, List<Mutation>>              cfm  = new 
HashMap<String, List<Mutation>>();
                    List<Mutation>                           mutl = new 
ArrayList<Mutation>();
                    Mutation                                 mut  = new 
Mutation();

                    mutl.add(mut);
                    cfm.put(cfname, mutl);
                    muts.put(rowkey, cfm);

                    mut.column_or_supercolumn = new ColumnOrSuperColumn();
                    mut.column_or_supercolumn.column = new Column();
                    mut.column_or_supercolumn.column.name = colname.getBytes();
                    mut.column_or_supercolumn.column.value = colname.getBytes();
                    mut.column_or_supercolumn.column.timestamp = 
System.currentTimeMillis() * 1000L;

                    ks.batchMutate(muts);
                }
                else
                {
                    cp.column = colname.getBytes();
                    ks.insert(rowkey, cp, colname.getBytes());
                }
            }
            finally
            {
                releaseKeyspace(ks);
            }

            ks = getReadKeyspace();

            try
            {
                List<Column>   cols = readSlice(rowkey, cfname, "%", "&");

                if (cols.size() == 0)
                {
                    System.out.println("ERROR:  column '" + colname + "' was 
inserted but getSlice returned 0 columns");
                }
                else
                {
                    boolean found = false;

                    for (Column c : cols)
                    {
                        String  fetched = new String(c.name);

                        if (colname.equals(fetched))
                        {
                            found = true;
                            pass = true;
                            break;
                        }
                    }

                    if (!found)
                        System.out.println("ERROR:  column '" + colname + "' 
was inserted but getSlice did NOT return it");
                }
            }
            finally
            {
                releaseKeyspace(ks);
            }

            ks = getWriteKeyspace();

            if (pass)
            {
                try
                {
                    ks.remove(rowkey, cp);
                }
                finally
                {
                    releaseKeyspace(ks);
                }
            }
        }
    }

    private List<Column> readSlice(
        String    rowkey,
        String    cfname,
        String    start,
        String    finish
        )
        throws Exception
    {
        Keyspace      ks   = getReadKeyspace();
        List<Column>  cols = null;
        ColumnParent  par  = new ColumnParent(cfname);

        try
        {
            SlicePredicate sp;

            sp = new SlicePredicate();
            sp.slice_range = new SliceRange();
            sp.slice_range.count = 100;
            sp.slice_range.start = start.getBytes();
            sp.slice_range.finish = finish.getBytes();

            cols = ks.getSlice(rowkey, par, sp);
        }
        catch (NotFoundException nfe)
        {
            System.out.println("ERROR:  got NotFoundException");
        }
        finally
        {
            releaseKeyspace(ks);
        }

        return cols;
    }

    private Keyspace getReadKeyspace()
    {
        try
        {
            CassandraClientPool ccp = CassandraClientPoolFactory.INSTANCE.get();
            CassandraClient     c   = ccp.borrowClient(cHosts);
            me.prettyprint.cassandra.service.Keyspace ks;

            ks = c.getKeyspace(cKeyspace, ConsistencyLevel.ONE);

            return ks;
        }
        catch (Exception x)
        {
            x.printStackTrace();
        }

        return null;
    }

    private Keyspace getWriteKeyspace()
    {
        try
        {
            CassandraClientPool ccp = CassandraClientPoolFactory.INSTANCE.get();
            CassandraClient     c   = ccp.borrowClient(cHosts);
            me.prettyprint.cassandra.service.Keyspace ks;

            ks = c.getKeyspace(cKeyspace, ConsistencyLevel.ALL);

            return ks;
        }
        catch (Exception x)
        {
            x.printStackTrace();
        }

        return null;
    }

    protected static void releaseKeyspace(
        Keyspace  ks
        )
    {
        if (ks != null)
        {
            try
            {
                CassandraClientPool ccp = 
CassandraClientPoolFactory.INSTANCE.get();

                ccp.releaseClient(ks.getClient());
            }
            catch (Exception x)
            {
                x.printStackTrace();
            }
        }
    }

}


-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to