[
https://issues.apache.org/jira/browse/GORA-497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16104496#comment-16104496
]
ASF GitHub Bot commented on GORA-497:
-------------------------------------
Github user djkevincr commented on a diff in the pull request:
https://github.com/apache/gora/pull/110#discussion_r130017958
--- Diff:
gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
---
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.cassandra.bean.CassandraKey;
+import org.apache.gora.cassandra.bean.Field;
+import org.apache.gora.cassandra.store.CassandraMapping;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is Utils class for Avro serialization.
+ */
+class AvroCassandraUtils {
+
+ /**
+ * Default schema index with value "0" used when AVRO Union data types
are stored.
+ */
+ private static final int DEFAULT_UNION_SCHEMA = 0;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AvroCassandraUtils.class);
+
+ static void processKeys(CassandraMapping cassandraMapping, Object key,
List<String> keys, List<Object> values) {
+ CassandraKey cassandraKey = cassandraMapping.getCassandraKey();
+ if (cassandraMapping.isPartitionKeyDefined()) {
+ if (cassandraKey != null) {
+ if (key instanceof PersistentBase) {
+ PersistentBase keyBase = (PersistentBase) key;
+ for (Schema.Field field : keyBase.getSchema().getFields()) {
+ if (cassandraMapping.getFieldFromFieldName(field.name()) !=
null) {
+ keys.add(field.name());
+ Object value = keyBase.get(field.pos());
+ value = getFieldValueFromAvroBean(field.schema(),
field.schema().getType(), value);
+ values.add(value);
+ } else {
+ LOG.debug("Ignoring field {}, Since field couldn't find in
the {} mapping", new Object[]{field.name(),
cassandraMapping.getPersistentClass()});
+ }
+ }
+ } else {
+ LOG.error("Key bean isn't extended by {} .", new
Object[]{cassandraMapping.getKeyClass(), PersistentBase.class});
+ }
+ } else {
+ for (Field field :
cassandraMapping.getInlinedDefinedPartitionKeys()) {
+ keys.add(field.getFieldName());
+ values.add(key);
+ }
+ }
+ } else {
+ keys.add(cassandraMapping.getDefaultCassandraKey().getFieldName());
+ values.add(key.toString());
+ }
+ }
+
+ /**
+ * For every field within an object, we pass in a field schema, Type and
value.
+ * This enables us to process fields (based on their characteristics)
+ * preparing them for persistence.
+ *
+ * @param fieldSchema the associated field schema
+ * @param type the field type
+ * @param fieldValue the field value.
+ * @return field value
+ */
+ static Object getFieldValueFromAvroBean(Schema fieldSchema, Schema.Type
type, Object fieldValue) {
+ switch (type) {
+ case RECORD:
+ PersistentBase persistent = (PersistentBase) fieldValue;
+ PersistentBase newRecord = (PersistentBase)
SpecificData.get().newRecord(persistent, persistent.getSchema());
+ for (Schema.Field member : fieldSchema.getFields()) {
+ if (member.pos() == 0 || !persistent.isDirty()) {
+ continue;
+ }
+ Schema memberSchema = member.schema();
+ Schema.Type memberType = memberSchema.getType();
+ Object memberValue = persistent.get(member.pos());
+ newRecord.put(member.pos(),
getFieldValueFromAvroBean(memberSchema, memberType, memberValue));
+ }
+ fieldValue = newRecord;
+ break;
+ case MAP:
+ Schema valueSchema = fieldSchema.getValueType();
+ Schema.Type valuetype = valueSchema.getType();
+ HashMap<String, Object> map = new HashMap<>();
+ for (Map.Entry<CharSequence, ?> e : ((Map<CharSequence, ?>)
fieldValue).entrySet()) {
+ String mapKey = e.getKey().toString();
+ Object mapValue = e.getValue();
+ mapValue = getFieldValueFromAvroBean(valueSchema, valuetype,
mapValue);
+ map.put(mapKey, mapValue);
+ }
+ fieldValue = map;
+ break;
+ case ARRAY:
+ valueSchema = fieldSchema.getElementType();
+ valuetype = valueSchema.getType();
+ ArrayList<Object> list = new ArrayList<>();
+ for (Object item : (Collection<?>) fieldValue) {
+ Object value = getFieldValueFromAvroBean(valueSchema, valuetype,
item);
+ list.add(value);
+ }
+ fieldValue = list;
+ break;
+ case UNION:
+ // storing the union selected schema, the actual value will
+ // be stored as soon as we get break out.
+ if (fieldValue != null) {
+ int schemaPos = getUnionSchema(fieldValue, fieldSchema);
+ Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
+ Schema.Type unionType = unionSchema.getType();
+ fieldValue = getFieldValueFromAvroBean(unionSchema, unionType,
fieldValue);
+ }
+ break;
+ case STRING:
+ fieldValue = fieldValue.toString();
+ break;
+ default:
+ break;
+ }
+ return fieldValue;
+ }
+
+ /**
+ * Given an object and the object schema this function obtains,
+ * from within the UNION schema, the position of the type used.
+ * If no data type can be inferred then we return a default value
+ * of position 0.
+ *
+ * @param pValue Object
+ * @param pUnionSchema avro Schema
+ * @return the unionSchemaPosition.
+ */
+ private static int getUnionSchema(Object pValue, Schema pUnionSchema) {
+ int unionSchemaPos = 0;
+// String valueType = pValue.getClass().getSimpleName();
+ for (Schema currentSchema : pUnionSchema.getTypes()) {
+ Schema.Type schemaType = currentSchema.getType();
+ if (pValue instanceof CharSequence &&
schemaType.equals(Schema.Type.STRING))
+ return unionSchemaPos;
+ else if (pValue instanceof ByteBuffer &&
schemaType.equals(Schema.Type.BYTES))
+ return unionSchemaPos;
+ else if (pValue instanceof Integer &&
schemaType.equals(Schema.Type.INT))
+ return unionSchemaPos;
+ else if (pValue instanceof Long &&
schemaType.equals(Schema.Type.LONG))
+ return unionSchemaPos;
+ else if (pValue instanceof Double &&
schemaType.equals(Schema.Type.DOUBLE))
+ return unionSchemaPos;
+ else if (pValue instanceof Float &&
schemaType.equals(Schema.Type.FLOAT))
+ return unionSchemaPos;
+ else if (pValue instanceof Boolean &&
schemaType.equals(Schema.Type.BOOLEAN))
+ return unionSchemaPos;
+ else if (pValue instanceof Map && schemaType.equals(Schema.Type.MAP))
+ return unionSchemaPos;
+ else if (pValue instanceof List &&
schemaType.equals(Schema.Type.ARRAY))
+ return unionSchemaPos;
+ else if (pValue instanceof Persistent &&
schemaType.equals(Schema.Type.RECORD))
+ return unionSchemaPos;
+ unionSchemaPos++;
+ }
+ // if we weren't able to determine which data type it is, then we
return the default
+ return DEFAULT_UNION_SCHEMA;
+ }
+
+ static String encodeFieldKey(final String key) {
+ if (key == null) {
+ return null;
+ }
+ return key.replace(".", "\u00B7")
+ .replace(":", "\u00FF")
+ .replace(";", "\u00FE")
+ .replace(" ", "\u00FD")
+ .replace("%", "\u00FC")
+ .replace("=", "\u00FB");
+ }
+
+ static String decodeFieldKey(final String key) {
--- End diff --
This method doesn't have any usages.
> Migrate CassandraThrift to CQL
> -------------------------------
>
> Key: GORA-497
> URL: https://issues.apache.org/jira/browse/GORA-497
> Project: Apache Gora
> Issue Type: Improvement
> Components: gora-cassandra
> Reporter: Madhawa Gunasekara
> Assignee: Madhawa Gunasekara
> Labels: gsoc2017
> Fix For: 0.8
>
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)