Pig: require deletes to be explicitly enabled. Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3628
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5d6c1bda Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5d6c1bda Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5d6c1bda Branch: refs/heads/cassandra-1.0 Commit: 5d6c1bdad7107618fc165bec74fc0444078910f8 Parents: 161e052 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Tue Feb 14 12:21:47 2012 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Fri Feb 17 05:17:01 2012 -0600 ---------------------------------------------------------------------- .../cassandra/hadoop/pig/CassandraStorage.java | 31 ++++++++++---- 1 files changed, 22 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d6c1bda/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 7e55ee0..8863975 100644 --- a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -67,6 +67,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo public final static String PIG_RPC_PORT = "PIG_RPC_PORT"; public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS"; public final static String PIG_PARTITIONER = "PIG_PARTITIONER"; + public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES"; private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER; private static final Log logger = LogFactory.getLog(CassandraStorage.class); @@ -74,6 +75,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private ByteBuffer slice_start = BOUND; private ByteBuffer slice_end = BOUND; private boolean slice_reverse = false; + private boolean allow_deletes = false; private String keyspace; private String column_family; private String loadSignature; @@ -331,6 +333,8 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER)); else if (ConfigHelper.getPartitioner(conf) == null) throw new IOException("PIG_PARTITIONER environment variable not set"); + if (System.getenv(PIG_ALLOW_DELETES) != null) + allow_deletes = Boolean.valueOf(System.getenv(PIG_ALLOW_DELETES)); } @Override @@ -585,11 +589,15 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo Mutation mutation = new Mutation(); if (t.get(1) == null) { - // TODO: optional deletion - mutation.deletion = new Deletion(); - mutation.deletion.predicate = new org.apache.cassandra.thrift.SlicePredicate(); - mutation.deletion.predicate.column_names = Arrays.asList(objToBB(t.get(0))); - mutation.deletion.setTimestamp(FBUtilities.timestampMicros()); + if (allow_deletes) + { + mutation.deletion = new Deletion(); + mutation.deletion.predicate = new org.apache.cassandra.thrift.SlicePredicate(); + mutation.deletion.predicate.column_names = Arrays.asList(objToBB(t.get(0))); + mutation.deletion.setTimestamp(FBUtilities.timestampMicros()); + } + else + throw new IOException("null found but deletes are disabled, set " + PIG_ALLOW_DELETES + "=true to enable"); } else { @@ -622,11 +630,16 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo column.setTimestamp(FBUtilities.timestampMicros()); columns.add(column); } - if (columns.isEmpty()) // TODO: optional deletion + if (columns.isEmpty()) { - mutation.deletion = new Deletion(); - mutation.deletion.super_column = objToBB(pair.get(0)); - mutation.deletion.setTimestamp(FBUtilities.timestampMicros()); + if (allow_deletes) + { + mutation.deletion = new Deletion(); + mutation.deletion.super_column = objToBB(pair.get(0)); + mutation.deletion.setTimestamp(FBUtilities.timestampMicros()); + } + else + throw new IOException("SuperColumn deletion attempted with empty bag, but deletes are disabled, set " + PIG_ALLOW_DELETES + "=true to enable"); } else {