Author: [email protected]
Date: Mon Apr 16 11:11:29 2012
New Revision: 2223
Log:
[AMDATUCASSANDRA-184] Fixed updating CFs without changing attributes that have
not been set explicitly
Modified:
branches/amdatu-cassandra-0.2.3/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
branches/amdatu-cassandra-0.2.3/cassandra-client/src/test/java/org/amdatu/cassandra/test/unit/client/mock/CassandraClientTest.java
branches/amdatu-cassandra-0.2.3/cassandra-listener/src/test/java/org/amdatu/cassandra/test/unit/listener/ListenerTest.java
Modified:
branches/amdatu-cassandra-0.2.3/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
==============================================================================
---
branches/amdatu-cassandra-0.2.3/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
(original)
+++
branches/amdatu-cassandra-0.2.3/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
Mon Apr 16 11:11:29 2012
@@ -22,9 +22,12 @@
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.ThriftCfDef;
import me.prettyprint.cassandra.service.ThriftCluster;
+import me.prettyprint.cassandra.service.ThriftColumnDef;
import me.prettyprint.cassandra.service.ThriftKsDef;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
+import me.prettyprint.hector.api.ddl.ColumnType;
+import me.prettyprint.hector.api.ddl.ComparatorType;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.exceptions.HectorException;
@@ -481,8 +484,11 @@
}
try {
if (!columnFamilyExists(keyspace, cfName)) {
- ThriftCfDef tCfDef = new ThriftCfDef(cfDef);
- tCfDef.setKeyspaceName(keyspace);
+ ThriftCfDef tCfDef = new ThriftCfDef(keyspace, cfName);
+ tCfDef.setColumnType(
ColumnType.getFromValue(cfDef.getColumn_type()));
+
tCfDef.setComparatorType(ComparatorType.getByClassName(cfDef.getComparator_type()));
+
tCfDef.setSubComparatorType(ComparatorType.getByClassName(cfDef.getSubcomparator_type()));
+ copyIsSetFields(cfDef, tCfDef);
getHectorCluster().addColumnFamily(tCfDef, true);
m_logService.log(LogService.LOG_INFO, "ColumnFamily '" +
cfName + "' has been added to keyspace '"
+ keyspace + "'");
@@ -514,9 +520,7 @@
if (oldCfDef != null) {
// First compare if there is really a difference.
if (!CfDefComparator.equal(oldCfDef, cfDef)) {
- ThriftCfDef tCfDef = new ThriftCfDef(cfDef);
- tCfDef.setKeyspaceName(keyspace);
- tCfDef.setId(oldCfDef.getId());
+ ThriftCfDef tCfDef = mergeCfDefs(oldCfDef, cfDef);
getHectorCluster().updateColumnFamily(tCfDef, true);
m_logService.log(LogService.LOG_INFO, "ColumnFamily '" +
cfName
+ "' has been updated in the keyspace '"
@@ -545,6 +549,81 @@
}
}
+ // Merges the old column family definition with the new one. The returned
CfDef
+ // contains the properties of the old CfDef, overwritten with the values
explicitly
+ // set in the new cfDef, but leaves the old values intact which have not
been set
+ // in the new CfDef
+ private ThriftCfDef mergeCfDefs(CfDef oldCfDef, CfDef newCfDef) {
+ CfDef mergedCfDef = new CfDef(oldCfDef);
+ CfDef._Fields[] fields = CfDef._Fields.values();
+ for (CfDef._Fields field : fields) {
+ if (newCfDef.isSet(field) &&
!CfDef._Fields.COLUMN_TYPE.equals(field)) {
+ mergedCfDef.setFieldValue(field,
newCfDef.getFieldValue(field));
+ }
+ }
+ return new ThriftCfDef(mergedCfDef);
+ }
+
+ // Copies fields from the from CfDef to the to CfDef, but only for the
fields
+ // that have explicitly been set (so the isSet flag is true for the field)
+ private void copyIsSetFields(CfDef from, ThriftCfDef to) {
+ CfDef._Fields[] fields = CfDef._Fields.values();
+ for (CfDef._Fields field : fields) {
+ if (from.isSet(field)) {
+ switch (field) {
+ case COMMENT:
+ to.setComment(from.getComment());
+ break;
+ case ROW_CACHE_SIZE:
+ to.setRowCacheSize(from.getRow_cache_size());
+ break;
+ case KEY_CACHE_SIZE:
+ to.setKeyCacheSize(from.getKey_cache_size());
+ break;
+ case READ_REPAIR_CHANCE:
+ to.setReadRepairChance(from.getRead_repair_chance());
+ break;
+ case COLUMN_METADATA:
+
to.setColumnMetadata(ThriftColumnDef.fromThriftList(from.getColumn_metadata()));
+ break;
+ case GC_GRACE_SECONDS:
+ to.setGcGraceSeconds(from.getGc_grace_seconds());
+ break;
+ case DEFAULT_VALIDATION_CLASS:
+
to.setDefaultValidationClass(from.getDefault_validation_class());
+ break;
+ case MIN_COMPACTION_THRESHOLD:
+
to.setMinCompactionThreshold(from.getMin_compaction_threshold());
+ break;
+ case MAX_COMPACTION_THRESHOLD:
+
to.setMaxCompactionThreshold(from.getMax_compaction_threshold());
+ break;
+ case ROW_CACHE_SAVE_PERIOD_IN_SECONDS:
+
to.setRowCacheSavePeriodInSeconds(from.getRow_cache_save_period_in_seconds());
+ break;
+ case KEY_CACHE_SAVE_PERIOD_IN_SECONDS:
+
to.setKeyCacheSavePeriodInSeconds(from.getKey_cache_save_period_in_seconds());
+ break;
+ case MEMTABLE_FLUSH_AFTER_MINS:
+
to.setMemtableFlushAfterMins(from.getMemtable_flush_after_mins());
+ break;
+ case MEMTABLE_THROUGHPUT_IN_MB:
+
to.setMemtableThroughputInMb(from.getMemtable_throughput_in_mb());
+ break;
+ case MEMTABLE_OPERATIONS_IN_MILLIONS:
+
to.setMemtableOperationsInMillions(from.getMemtable_operations_in_millions());
+ break;
+ case REPLICATE_ON_WRITE:
+ to.setReplicateOnWrite(from.isReplicate_on_write());
+ break;
+ case KEY_VALIDATION_CLASS:
+
to.setKeyValidationClass(from.getKey_validation_class());
+ break;
+ }
+ }
+ }
+ }
+
public synchronized boolean isColumnFamilyChanged(final String keyspace,
final String cfName,
final String columnType, final String comparatorType, final String
subComparatorType) {
try {
Modified:
branches/amdatu-cassandra-0.2.3/cassandra-client/src/test/java/org/amdatu/cassandra/test/unit/client/mock/CassandraClientTest.java
==============================================================================
---
branches/amdatu-cassandra-0.2.3/cassandra-client/src/test/java/org/amdatu/cassandra/test/unit/client/mock/CassandraClientTest.java
(original)
+++
branches/amdatu-cassandra-0.2.3/cassandra-client/src/test/java/org/amdatu/cassandra/test/unit/client/mock/CassandraClientTest.java
Mon Apr 16 11:11:29 2012
@@ -15,12 +15,16 @@
*/
package org.amdatu.cassandra.test.unit.client.mock;
+import me.prettyprint.cassandra.constants.CFMetaDataDefaults;
+
+import org.amdatu.cassandra.client.ThriftException;
import org.amdatu.cassandra.client.service.CassandraClientConfigurationService;
import
org.amdatu.cassandra.client.service.CassandraClientConfigurationServiceImpl;
import org.amdatu.cassandra.client.service.CassandraClientServiceImpl;
import org.amdatu.cassandra.test.unit.framework.UnitTestFramework;
import org.amdatu.cassandra.test.unit.framework.mock.EventAdminMock;
+import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -47,6 +51,7 @@
private static final String STANDARD = "Standard";
private static final String SUPER = "Super";
private static final String BYTESTYPE = "BytesType";
+ private static final String UTF8TYPE = "UTF8Type";
// The Cassandra client service to test
private static CassandraClientServiceImpl CLIENT;
@@ -111,11 +116,10 @@
Assert.assertTrue(CLIENT.columnFamilyExists(TEST_KS, TEST_CF));
Assert.assertEquals(CLIENT.getColumnType(TEST_KS, TEST_CF), STANDARD);
CfDef cfDef = CLIENT.getColumnFamily(TEST_KS, TEST_CF);
- Assert.assertEquals(STANDARD, cfDef.getColumn_type());
Assert.assertTrue(cfDef.getComparator_type().endsWith(BYTESTYPE));
- Assert.assertEquals(TEST_KS, cfDef.getKeyspace());
- Assert.assertEquals(TEST_CF, cfDef.getName());
- Assert.assertNull(cfDef.getSubcomparator_type());
+ Assert.assertEquals(null, cfDef.getSubcomparator_type());
+ assertSchema(cfDef);
+ assertDefaults(cfDef);
Assert.assertFalse(CLIENT.isColumnFamilyChanged(TEST_KS, TEST_CF,
STANDARD, BYTESTYPE, null));
Assert.assertTrue(CLIENT.isColumnFamilyChanged(TEST_KS, TEST_CF,
SUPER, BYTESTYPE, BYTESTYPE));
@@ -130,19 +134,44 @@
cfDef.setColumn_metadata(cDefs);
CLIENT.updateColumnFamily(TEST_KS, cfDef);
CfDef newCfDef = CLIENT.getColumnFamily(TEST_KS, TEST_CF);
- Assert.assertTrue(newCfDef.getRead_repair_chance() == 0.37);
- List<ColumnDef> newCDefs = newCfDef.getColumn_metadata();
- Assert.assertTrue(newCDefs.size() == 1);
- Assert.assertEquals(new String(newCDefs.get(0).getName(), "UTF-8"),
"indexcol");
- Assert.assertEquals(newCDefs.get(0).getIndex_name(), "indexname");
- Assert.assertEquals(newCDefs.get(0).getIndex_type(), IndexType.KEYS);
-
- // Try to remove the index
- newCfDef.setColumn_metadata(null);
- CLIENT.updateColumnFamily(TEST_KS, newCfDef);
- newCDefs = CLIENT.getColumnFamily(TEST_KS,
TEST_CF).getColumn_metadata();
- Assert.assertEquals(newCDefs.size(), 0);
-
+ Assert.assertEquals(newCfDef.getRead_repair_chance(), 0.37);
+ assertIndex(newCfDef);
+
+ // Now try to update the CF with a single property, it should leave
the old
+ // values intact
+ CfDef blankCfDef = new CfDef(TEST_KS, TEST_CF);
+ blankCfDef.setGc_grace_seconds(37);
+ blankCfDef.setKey_cache_save_period_in_seconds(37);
+ CLIENT.updateColumnFamily(TEST_KS, blankCfDef);
+ CfDef updatedCfDef = CLIENT.getColumnFamily(TEST_KS, TEST_CF);
+ assertSchema(updatedCfDef);
+ Assert.assertEquals(updatedCfDef.getRead_repair_chance(), 0.37);
+ Assert.assertEquals(updatedCfDef.getGc_grace_seconds(), 37);
+
Assert.assertEquals(updatedCfDef.getKey_cache_save_period_in_seconds(), 37);
+ assertIndex(updatedCfDef);
+
+ // Now try to change only the grace seconds, cache period should
remain intact
+ CfDef blankCfDef2 = new CfDef(TEST_KS, TEST_CF);
+ blankCfDef2.setGc_grace_seconds(3737);
+ CLIENT.updateColumnFamily(TEST_KS, blankCfDef2);
+ CfDef updatedCfDef2 = CLIENT.getColumnFamily(TEST_KS, TEST_CF);
+ assertSchema(updatedCfDef2);
+ Assert.assertEquals(updatedCfDef2.getRead_repair_chance(), 0.37);
+ Assert.assertEquals(updatedCfDef2.getGc_grace_seconds(), 3737);
+
Assert.assertEquals(updatedCfDef2.getKey_cache_save_period_in_seconds(), 37);
+ assertIndex(updatedCfDef2);
+
+ // Try to remove the index, using an empty list
+ CfDef blankCfDef3 = new CfDef(TEST_KS, TEST_CF);
+ blankCfDef3.setColumn_metadata(new ArrayList<ColumnDef>());
+ CLIENT.updateColumnFamily(TEST_KS, blankCfDef3);
+ CfDef updatedCfDef3 = CLIENT.getColumnFamily(TEST_KS, TEST_CF);
+ assertSchema(updatedCfDef3);
+ Assert.assertEquals(updatedCfDef3.getRead_repair_chance(), 0.37);
+ Assert.assertEquals(updatedCfDef3.getGc_grace_seconds(), 3737);
+
Assert.assertEquals(updatedCfDef3.getKey_cache_save_period_in_seconds(), 37);
+ Assert.assertEquals(updatedCfDef3.getColumn_metadata().size(), 0);
+
// Complex case
CfDef scfDef = new CfDef(TEST_KS, TEST_CF_2);
scfDef.setColumn_type(SUPER);
@@ -156,10 +185,84 @@
Assert.assertTrue(CLIENT.columnFamilyExists(TEST_KS, TEST_CF_2));
Assert.assertEquals(CLIENT.getColumnType(TEST_KS, TEST_CF_2), SUPER);
newCfDef = CLIENT.getColumnFamily(TEST_KS, TEST_CF_2);
- newCDefs = newCfDef.getColumn_metadata();
+ Assert.assertTrue(newCfDef.getComparator_type().endsWith(BYTESTYPE));
+
Assert.assertTrue(newCfDef.getSubcomparator_type().endsWith(BYTESTYPE));
+ assertDefaults(newCfDef);
+ List<ColumnDef> newCDefs = newCfDef.getColumn_metadata();
Assert.assertTrue(newCDefs.size() == 1);
Assert.assertEquals(new String(newCDefs.get(0).getName(), "UTF-8"),
"indexcol");
Assert.assertFalse(CLIENT.isColumnFamilyChanged(TEST_KS, TEST_CF_2,
SUPER, BYTESTYPE, BYTESTYPE));
Assert.assertTrue(CLIENT.isColumnFamilyChanged(TEST_KS, TEST_CF_2,
STANDARD, BYTESTYPE, null));
+
+ // Now try to update two CF properties, it should update them
+ CfDef blankCfDef4 = new CfDef(TEST_KS, TEST_CF_2);
+ blankCfDef4.setGc_grace_seconds(37);
+ blankCfDef4.setKey_cache_save_period_in_seconds(37);
+ CLIENT.updateColumnFamily(TEST_KS, blankCfDef4);
+ CfDef updatedCfDef4 = CLIENT.getColumnFamily(TEST_KS, TEST_CF_2);
+ assertSuperSchema(updatedCfDef4);
+ Assert.assertEquals(updatedCfDef4.getGc_grace_seconds(), 37);
+
Assert.assertEquals(updatedCfDef4.getKey_cache_save_period_in_seconds(), 37);
+
+ // Now try to change only the grace seconds, cache period should
remain intact
+ CfDef blankCfDef5 = new CfDef(TEST_KS, TEST_CF_2);
+ blankCfDef5.setGc_grace_seconds(3737);
+ CLIENT.updateColumnFamily(TEST_KS, blankCfDef5);
+ CfDef updatedCfDef5 = CLIENT.getColumnFamily(TEST_KS, TEST_CF_2);
+ assertSuperSchema(updatedCfDef5);
+ Assert.assertEquals(updatedCfDef5.getGc_grace_seconds(), 3737);
+
Assert.assertEquals(updatedCfDef5.getKey_cache_save_period_in_seconds(), 37);
+
+ // Update comparator type, this should fail with a ThriftException
+ CfDef blankCfDef6 = new CfDef(TEST_KS, TEST_CF_2);
+ blankCfDef6.setComparator_type(UTF8TYPE);
+ boolean fail = false;
+ try {
+ CLIENT.updateColumnFamily(TEST_KS, blankCfDef6);
+ } catch (ThriftException te) {
+ fail = true;
+ }
+ Assert.assertTrue("Trying to change the comparator type of a CF should
have resulted in a ThriftException", fail);
+ }
+
+ private void assertSchema(CfDef cfDef) {
+ Assert.assertEquals(STANDARD, cfDef.getColumn_type());
+ Assert.assertTrue(cfDef.getComparator_type().endsWith(BYTESTYPE));
+ Assert.assertEquals(TEST_KS, cfDef.getKeyspace());
+ Assert.assertEquals(TEST_CF, cfDef.getName());
+ Assert.assertNull(cfDef.getSubcomparator_type());
+ }
+
+ private void assertSuperSchema(CfDef cfDef) {
+ Assert.assertEquals(SUPER, cfDef.getColumn_type());
+ Assert.assertTrue(cfDef.getComparator_type().endsWith(BYTESTYPE));
+ Assert.assertTrue(cfDef.getSubcomparator_type().endsWith(BYTESTYPE));
+ Assert.assertEquals(TEST_KS, cfDef.getKeyspace());
+ Assert.assertEquals(TEST_CF_2, cfDef.getName());
+ }
+
+ // Asserts that all values in the CfDef are set to the default values in
Hector (except for KS/CF name
+ // and supercol/col types
+ private void assertDefaults(CfDef cfDef) throws
UnsupportedEncodingException {
+ Assert.assertEquals(CFMetaDataDefaults.DEFAULT_GC_GRACE_SECONDS,
cfDef.getGc_grace_seconds());
+
Assert.assertEquals(CFMetaDataDefaults.DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS,
cfDef.getKey_cache_save_period_in_seconds());
+ Assert.assertEquals(CFMetaDataDefaults.DEFAULT_KEY_CACHE_SIZE,
cfDef.getKey_cache_size());
+
Assert.assertEquals(CFMetaDataDefaults.DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS,
cfDef.getRow_cache_save_period_in_seconds());
+ Assert.assertEquals(CFMetaDataDefaults.DEFAULT_ROW_CACHE_SIZE,
cfDef.getRow_cache_size());
+
Assert.assertEquals(CFMetaDataDefaults.DEFAULT_MIN_COMPACTION_THRESHOLD,
cfDef.getMin_compaction_threshold());
+
Assert.assertEquals(CFMetaDataDefaults.DEFAULT_MAX_COMPACTION_THRESHOLD,
cfDef.getMax_compaction_threshold());
+
Assert.assertEquals(CFMetaDataDefaults.DEFAULT_MEMTABLE_LIFETIME_IN_MINS,
cfDef.getMemtable_flush_after_mins());
+
Assert.assertEquals(CFMetaDataDefaults.DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS,
cfDef.getMemtable_operations_in_millions());
+
Assert.assertEquals(CFMetaDataDefaults.DEFAULT_MEMTABLE_THROUGHPUT_IN_MB,
cfDef.getMemtable_throughput_in_mb());
+ Assert.assertEquals(CFMetaDataDefaults.DEFAULT_READ_REPAIR_CHANCE,
cfDef.getRead_repair_chance());
+ Assert.assertEquals(CFMetaDataDefaults.DEFAULT_REPLICATE_ON_WRITE,
cfDef.isReplicate_on_write());
+ }
+
+ private void assertIndex(CfDef cfDef) throws UnsupportedEncodingException {
+ List<ColumnDef> cDefs = cfDef.getColumn_metadata();
+ Assert.assertTrue(cDefs.size() == 1);
+ Assert.assertEquals(new String(cDefs.get(0).getName(), "UTF-8"),
"indexcol");
+ Assert.assertEquals(cDefs.get(0).getIndex_name(), "indexname");
+ Assert.assertEquals(cDefs.get(0).getIndex_type(), IndexType.KEYS);
}
}
Modified:
branches/amdatu-cassandra-0.2.3/cassandra-listener/src/test/java/org/amdatu/cassandra/test/unit/listener/ListenerTest.java
==============================================================================
---
branches/amdatu-cassandra-0.2.3/cassandra-listener/src/test/java/org/amdatu/cassandra/test/unit/listener/ListenerTest.java
(original)
+++
branches/amdatu-cassandra-0.2.3/cassandra-listener/src/test/java/org/amdatu/cassandra/test/unit/listener/ListenerTest.java
Mon Apr 16 11:11:29 2012
@@ -31,6 +31,7 @@
import org.amdatu.cassandra.test.unit.listener.mock.CPMFactoryMock;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.ColumnDef;
@@ -195,14 +196,10 @@
cfDef.setComparator_type(CompareType.BYTESTYPE.getValue());
Assert.assertFalse(DAEMON.updateColumnFamily(TEST_KS, cfDef));
- // Update with change standard -> super, should throw an exception
+ // Update with change standard -> super, should not throw an exception
anymore, the column
+ // type is ignored
cfDef.setColumn_type(ColumnType.SUPER.getValue());
- try {
- DAEMON.updateColumnFamily(TEST_KS, cfDef);
- Assert.fail("Incompatible update of ColumnFamily '" + TEST_CF + "'
does not throw an exception");
- }
- catch (Exception e) {
- }
+ DAEMON.updateColumnFamily(TEST_KS, cfDef);
// Add a ColumnDefinition, should succeed
cfDef.setColumn_type(ColumnType.STANDARD.getValue());
@@ -216,10 +213,11 @@
// Invoke update without an effective change, should return false
Assert.assertFalse(DAEMON.updateColumnFamily(TEST_KS, cfDef));
- // Update the ColumnDefinition without the ColDef, the ColDef should
be removed
+ // Update the ColumnDefinition with an empty array of ColDefs, the
ColDef should be removed
cfDef = new CfDef(TEST_KS, TEST_CF);
cfDef.setColumn_type(ColumnType.STANDARD.getValue());
cfDef.setComparator_type(CompareType.BYTESTYPE.getValue());
+ cfDef.setColumn_metadata(new ArrayList<ColumnDef>());
DAEMON.updateColumnFamily(TEST_KS, cfDef);
CfDef def = DAEMON.getColumnFamily(TEST_KS, TEST_CF);
Assert.assertTrue(def.getColumn_metadata().size() == 0);
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits