Fix functionality of the UpdateByQuery for Avro Serialization
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/99894b82 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/99894b82 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/99894b82 Branch: refs/heads/master Commit: 99894b82ae77290ff43d84eeffe638c3ed55d2d9 Parents: c5352b0 Author: madhawa <madhaw...@gmail.com> Authored: Fri Jul 28 09:09:17 2017 +0530 Committer: madhawa <madhaw...@gmail.com> Committed: Fri Jul 28 09:09:17 2017 +0530 ---------------------------------------------------------------------- .../serializers/CassandraQueryFactory.java | 25 +++++++- .../gora/cassandra/store/CassandraMapping.java | 9 +++ .../compositeKey/gora-cassandra-mapping.xml | 33 ----------- .../TestCassandraStoreWithCassandraKey.java | 61 +++++++++++++++++--- ...stCassandraStoreWithNativeSerialization.java | 38 ++++++------ 5 files changed, 102 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/99894b82/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java index bf33750..4362a04 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java @@ -20,11 +20,13 @@ import com.datastax.driver.core.querybuilder.Delete; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; import com.datastax.driver.core.querybuilder.Update; +import org.apache.avro.Schema; import org.apache.gora.cassandra.bean.CassandraKey; import org.apache.gora.cassandra.bean.ClusterKeyField; import org.apache.gora.cassandra.bean.Field; import org.apache.gora.cassandra.bean.KeySpace; import org.apache.gora.cassandra.bean.PartitionKeyField; +import org.apache.gora.cassandra.persistent.CassandraNativePersistent; import org.apache.gora.cassandra.query.CassandraQuery; import org.apache.gora.cassandra.store.CassandraMapping; import org.apache.gora.query.Query; @@ -609,9 +611,26 @@ class CassandraQueryFactory { Update.Assignments updateAssignments = null; if (cassandraQuery instanceof CassandraQuery) { String[] columnNames = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields())); - for (String column : columnNames) { - updateAssignments = update.with(QueryBuilder.set(column, "?")); - objects.add(((CassandraQuery) cassandraQuery).getUpdateFieldValue(column)); + if(CassandraNativePersistent.class.isAssignableFrom(mapping.getPersistentClass())) { + for (String column : columnNames) { + updateAssignments = update.with(QueryBuilder.set(column, "?")); + objects.add(((CassandraQuery) cassandraQuery).getUpdateFieldValue(mapping.getFieldFromColumnName(column).getFieldName())); + } + } else { + for (String column : columnNames) { + updateAssignments = update.with(QueryBuilder.set(column, "?")); + String field = mapping.getFieldFromColumnName(column).getFieldName(); + Object value = ((CassandraQuery) cassandraQuery).getUpdateFieldValue(field); + try { + Schema schema = (Schema) mapping.getPersistentClass().getField("SCHEMA$").get(null); + Schema schemaField = schema.getField(field).schema(); + objects.add(AvroCassandraUtils.getFieldValueFromAvroBean(schemaField, schemaField.getType(), value)); + } catch (IllegalAccessException | NoSuchFieldException e) { + throw new RuntimeException("SCHEMA$ field can't accessible, Please recompile the Avro schema with goracompiler."); + } catch (NullPointerException e) { + throw new RuntimeException(field + " field couldn't find in the class " + mapping.getPersistentClass() + "."); + } + } } } String primaryKey = null; http://git-wip-us.apache.org/repos/asf/gora/blob/99894b82/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java index 61b8d1e..5699355 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java @@ -70,6 +70,15 @@ public class CassandraMapping { return null; } + public Field getFieldFromColumnName(String columnName) { + for (Field field1 : fieldList) { + if (field1.getColumnName().equals(columnName)) { + return field1; + } + } + return null; + } + public String[] getFieldNames() { List<String> fieldNames = new ArrayList<>(fieldList.size()); for (Field field : fieldList) { http://git-wip-us.apache.org/repos/asf/gora/blob/99894b82/gora-cassandra-cql/src/test/conf/compositeKey/gora-cassandra-mapping.xml ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/conf/compositeKey/gora-cassandra-mapping.xml b/gora-cassandra-cql/src/test/conf/compositeKey/gora-cassandra-mapping.xml index 4ebdcd6..42c6343 100644 --- a/gora-cassandra-cql/src/test/conf/compositeKey/gora-cassandra-mapping.xml +++ b/gora-cassandra-cql/src/test/conf/compositeKey/gora-cassandra-mapping.xml @@ -17,39 +17,6 @@ ~ limitations under the License. --> -<!-- - The value of 'host' attribute of keyspace tag should match exactly what is in - gora.properties file. Essentially this means that if you are using port number, you should - use it every where regardless of whether it is the default port or not. - At runtime Gora will otherwise try to connect to localhost - https://issues.apache.org/jira/browse/GORA-269 - - The values of 'replication_factor' and 'placement_strategy' attribute of keyspace tag - only apply if gora create the kyespace. they have no effect if this is being used against - an existing keyspace. the default value for 'replication_factor' is '1' - - The value of 'placement_strategy' should be a fully qualifed class name that is known to - the cassansra cluster, not the application or gora. As of this writing, the classes that ship - with cassandra are: - 'org.apache.cassandra.locator.SimpleStrategy' - 'org.apache.cassandra.locator.NetworkTopologyStrategy' - gora cassandra would use SimpleStrategy by default if no value for this attribute is specified - - The default value of 'gc_grace_seconds' is '0' which is ONLY VIABLE FOR SINGLE NODE - CLUSTER. you should update this value according to your cluster configuration. - https://wiki.apache.org/cassandra/StorageConfiguration - - The value of 'ttl' (time to live) attribute of field tag should most likely always - be zero unless you want Cassandra to create Tombstones and delete portions of your - data once this period expires. Any positive value is read and bound to the number - of seconds after which the value for that field will disappear. The default value of ttl - is '0' - - More information on gora-cassandra configuration and mapping's can be found - at http://gora.apache.org/current/gora-cassandra.html ---> - - <gora-otd> <keyspace name="EmployeeSpace" durableWrite="false"> <placementStrategy name="SimpleStrategy" /> http://git-wip-us.apache.org/repos/asf/gora/blob/99894b82/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java index f2556e7..5273481 100644 --- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java +++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java @@ -21,9 +21,9 @@ import org.apache.avro.util.Utf8; import org.apache.gora.cassandra.GoraCassandraTestDriver; import org.apache.gora.cassandra.example.generated.AvroSerialization.CassandraKey; import org.apache.gora.cassandra.example.generated.AvroSerialization.CassandraRecord; +import org.apache.gora.cassandra.query.CassandraQuery; import org.apache.gora.query.Query; import org.apache.gora.query.Result; -import org.apache.gora.store.DataStore; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -39,7 +39,7 @@ import java.util.Properties; */ public class TestCassandraStoreWithCassandraKey { private static GoraCassandraTestDriver testDriver = new GoraCassandraTestDriver(); - private static DataStore<CassandraKey, CassandraRecord> cassandraRecordDataStore; + private static CassandraStore<CassandraKey, CassandraRecord> cassandraRecordDataStore; private static Properties parameter; @BeforeClass @@ -47,7 +47,7 @@ public class TestCassandraStoreWithCassandraKey { setProperties(); testDriver.setParameters(parameter); testDriver.setUpClass(); - cassandraRecordDataStore = testDriver.createDataStore(CassandraKey.class, CassandraRecord.class); + cassandraRecordDataStore =(CassandraStore<CassandraKey, CassandraRecord> )testDriver.createDataStore(CassandraKey.class, CassandraRecord.class); } private static void setProperties() { @@ -116,13 +116,13 @@ public class TestCassandraStoreWithCassandraKey { */ @Test public void testExecuteQuery() throws Exception { - Query query = cassandraRecordDataStore.newQuery(); + Query<CassandraKey, CassandraRecord> query = cassandraRecordDataStore.newQuery(); cassandraRecordDataStore.truncateSchema(); CassandraKey key = new CassandraKey(); key.setTimestamp(2027L); key.setUrl("www.apache.org"); query.setKey(key); - Result result = query.execute(); + Result<CassandraKey, CassandraRecord> result = query.execute(); Assert.assertFalse(result.next()); CassandraRecord record = new CassandraRecord(); record.setDataLong(719411002L); @@ -141,7 +141,7 @@ public class TestCassandraStoreWithCassandraKey { result = query.execute(); Assert.assertTrue(result.next()); // verify data - retrievedRecord = (CassandraRecord) result.get(); + retrievedRecord = result.get(); Assert.assertEquals(record.getDataInt(), retrievedRecord.getDataInt()); Assert.assertEquals(record.getDataString(), retrievedRecord.getDataString()); Assert.assertEquals(record.getDataLong(), retrievedRecord.getDataLong()); @@ -152,7 +152,7 @@ public class TestCassandraStoreWithCassandraKey { result = query.execute(); Assert.assertFalse(result.next()); // test empty query - Query emptyQuery = cassandraRecordDataStore.newQuery(); + Query<CassandraKey, CassandraRecord> emptyQuery = cassandraRecordDataStore.newQuery(); result = emptyQuery.execute(); Assert.assertFalse(result.next()); cassandraRecordDataStore.put(key, record); @@ -197,10 +197,10 @@ public class TestCassandraStoreWithCassandraKey { cassandraRecordDataStore.put(key2,record2); cassandraRecordDataStore.put(key3,record3); cassandraRecordDataStore.put(key4,record4); - Query rangeQuery = cassandraRecordDataStore.newQuery(); + Query<CassandraKey, CassandraRecord> rangeQuery = cassandraRecordDataStore.newQuery(); rangeQuery.setStartKey(key2); rangeQuery.setEndKey(key2); - Result result = rangeQuery.execute(); + Result<CassandraKey, CassandraRecord> result = rangeQuery.execute(); int i = 0; while (result.next()) { i++; @@ -217,4 +217,47 @@ public class TestCassandraStoreWithCassandraKey { Assert.assertEquals(2,i); } + @Test + public void testUpdateByQuery() { + cassandraRecordDataStore.truncateSchema(); + //insert data + CassandraRecord record1 = new CassandraRecord(); + CassandraRecord record2 = new CassandraRecord(); + CassandraRecord record3 = new CassandraRecord(); + CassandraRecord record4 = new CassandraRecord(); + record1.setDataLong(719411002L); + record1.setDataString(new Utf8("Madawa")); + record1.setDataInt(100); + record2.setDataLong(712778588L); + record2.setDataString(new Utf8("Kasun")); + record2.setDataInt(101); + record3.setDataLong(716069539L); + record3.setDataString(new Utf8("Charith")); + record3.setDataInt(102); + record4.setDataLong(112956051L); + record4.setDataString(new Utf8("Bhanuka")); + record4.setDataInt(103); + CassandraKey key1 = new CassandraKey(); + key1.setTimestamp(200L); + key1.setUrl("www.apache.org"); + CassandraKey key2 = new CassandraKey(); + key2.setTimestamp(205L); + key2.setUrl("www.apache.org"); + CassandraKey key3 = new CassandraKey(); + key3.setTimestamp(210L); + key3.setUrl("www.apache.org"); + CassandraKey key4 = new CassandraKey(); + key4.setTimestamp(215L); + key4.setUrl("www.apache.org"); + cassandraRecordDataStore.put(key1,record1); + cassandraRecordDataStore.put(key2,record2); + cassandraRecordDataStore.put(key3,record3); + cassandraRecordDataStore.put(key4,record4); + CassandraQuery<CassandraKey, CassandraRecord> query = new CassandraQuery<>(cassandraRecordDataStore); + query.setKey(key1); + query.addUpdateField("dataString", new Utf8("test123")); + cassandraRecordDataStore.updateByQuery(query); + CassandraRecord result = cassandraRecordDataStore.get(key1); + Assert.assertEquals(new Utf8("test123"), result.getDataString()); + } } http://git-wip-us.apache.org/repos/asf/gora/blob/99894b82/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java index f3df5e4..f90862a 100644 --- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java +++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java @@ -46,7 +46,7 @@ import java.util.UUID; */ public class TestCassandraStoreWithNativeSerialization { private static GoraCassandraTestDriver testDriver = new GoraCassandraTestDriver(); - private static DataStore<UUID, User> userDataStore; + private static CassandraStore<UUID, User> userDataStore; private static Properties parameter; @BeforeClass @@ -54,7 +54,7 @@ public class TestCassandraStoreWithNativeSerialization { setProperties(); testDriver.setParameters(parameter); testDriver.setUpClass(); - userDataStore = testDriver.createDataStore(UUID.class, User.class); + userDataStore = (CassandraStore<UUID, User>) testDriver.createDataStore(UUID.class, User.class); } private static void setProperties() { @@ -180,7 +180,7 @@ public class TestCassandraStoreWithNativeSerialization { Query<UUID, User> query1 = userDataStore.newQuery(); Result<UUID, User> result1 = userDataStore.execute(query1); int i = 0; - Assert.assertEquals(result1.getProgress(),0.0,0.0); + Assert.assertEquals(result1.getProgress(), 0.0, 0.0); while (result1.next()) { // check objects values Assert.assertEquals(result1.get().getName(), users.get(result1.getKey()).getName()); @@ -188,7 +188,7 @@ public class TestCassandraStoreWithNativeSerialization { Assert.assertEquals(result1.get().getUserId(), users.get(result1.getKey()).getUserId()); i++; } - Assert.assertEquals(result1.getProgress(),1.0,0.0); + Assert.assertEquals(result1.getProgress(), 1.0, 0.0); Assert.assertEquals(3, i); // Check limit query @@ -244,7 +244,7 @@ public class TestCassandraStoreWithNativeSerialization { userDataStore.deleteByQuery(query2); User partialDeletedUser = userDataStore.get(id2); Assert.assertNull(partialDeletedUser.getName()); - Assert.assertEquals(partialDeletedUser.getDateOfBirth(),user2.getDateOfBirth()); + Assert.assertEquals(partialDeletedUser.getDateOfBirth(), user2.getDateOfBirth()); } /** @@ -260,46 +260,46 @@ public class TestCassandraStoreWithNativeSerialization { User user2 = new User(id2, "user2", Date.from(Instant.now())); userDataStore.put(id2, user2); Query<UUID, User> query1 = userDataStore.newQuery(); - if(query1 instanceof CassandraQuery) { + if (query1 instanceof CassandraQuery) { ((CassandraQuery) query1).addUpdateField("name", "madhawa"); } query1.setKey(id1); - if(userDataStore instanceof CassandraStore) { - ((CassandraStore) userDataStore).updateByQuery(query1); + if (userDataStore instanceof CassandraStore) { + userDataStore.updateByQuery(query1); } User user = userDataStore.get(id1); - Assert.assertEquals(user.getName(),"madhawa"); + Assert.assertEquals(user.getName(), "madhawa"); } @Test public void testComplexTypes() throws GoraException { DataStore<String, ComplexTypes> documentDataStore = testDriver.createDataStore(String.class, ComplexTypes.class); ComplexTypes document = new ComplexTypes("document1"); - document.setIntArrayDataType(new int[]{1,2,3}); - document.setStringArrayDataType(new String[] {"madhawa", "kasun", "gunasekara", "pannipitiya", "srilanka"}); - document.setListDataType(new ArrayList<>(Arrays.asList("gora","nutch","tika","opennlp", "olingo"))); + document.setIntArrayDataType(new int[]{1, 2, 3}); + document.setStringArrayDataType(new String[]{"madhawa", "kasun", "gunasekara", "pannipitiya", "srilanka"}); + document.setListDataType(new ArrayList<>(Arrays.asList("gora", "nutch", "tika", "opennlp", "olingo"))); document.setSetDataType(new HashSet<>(Arrays.asList("important", "keeper"))); - HashMap<String,String> map = new HashMap<>(); - map.put("LK","Colombo"); + HashMap<String, String> map = new HashMap<>(); + map.put("LK", "Colombo"); document.setMapDataType(map); documentDataStore.put("document1", document); ComplexTypes retrievedDocuemnt = documentDataStore.get("document1"); // verify list data - for(int i=0; i<document.getListDataType().size(); i++) { + for (int i = 0; i < document.getListDataType().size(); i++) { Assert.assertEquals(document.getListDataType().get(i), retrievedDocuemnt.getListDataType().get(i)); } // verify set data - for(int i=0; i<document.getSetDataType().size(); i++) { + for (int i = 0; i < document.getSetDataType().size(); i++) { Assert.assertTrue(Arrays.equals(document.getSetDataType().toArray(), retrievedDocuemnt.getSetDataType().toArray())); } // verify array data - for(int i=0; i<document.getIntArrayDataType().length; i++) { + for (int i = 0; i < document.getIntArrayDataType().length; i++) { Assert.assertTrue(Arrays.equals(document.getIntArrayDataType(), retrievedDocuemnt.getIntArrayDataType())); } - for(int i=0; i<document.getStringArrayDataType().length; i++) { + for (int i = 0; i < document.getStringArrayDataType().length; i++) { Assert.assertTrue(Arrays.equals(document.getStringArrayDataType(), retrievedDocuemnt.getStringArrayDataType())); } // verify map data - Assert.assertEquals(map.get("LK"),retrievedDocuemnt.getMapDataType().get("LK")); + Assert.assertEquals(map.get("LK"), retrievedDocuemnt.getMapDataType().get("LK")); } }