[ https://issues.apache.org/jira/browse/GORA-497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16104496#comment-16104496 ]
ASF GitHub Bot commented on GORA-497: ------------------------------------- Github user djkevincr commented on a diff in the pull request: https://github.com/apache/gora/pull/110#discussion_r130017958 --- Diff: gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java --- @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.cassandra.serializers; + +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; +import org.apache.gora.cassandra.bean.CassandraKey; +import org.apache.gora.cassandra.bean.Field; +import org.apache.gora.cassandra.store.CassandraMapping; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.DirtyListWrapper; +import org.apache.gora.persistency.impl.DirtyMapWrapper; +import org.apache.gora.persistency.impl.PersistentBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This class is Utils class for Avro serialization. + */ +class AvroCassandraUtils { + + /** + * Default schema index with value "0" used when AVRO Union data types are stored. + */ + private static final int DEFAULT_UNION_SCHEMA = 0; + + private static final Logger LOG = LoggerFactory.getLogger(AvroCassandraUtils.class); + + static void processKeys(CassandraMapping cassandraMapping, Object key, List<String> keys, List<Object> values) { + CassandraKey cassandraKey = cassandraMapping.getCassandraKey(); + if (cassandraMapping.isPartitionKeyDefined()) { + if (cassandraKey != null) { + if (key instanceof PersistentBase) { + PersistentBase keyBase = (PersistentBase) key; + for (Schema.Field field : keyBase.getSchema().getFields()) { + if (cassandraMapping.getFieldFromFieldName(field.name()) != null) { + keys.add(field.name()); + Object value = keyBase.get(field.pos()); + value = getFieldValueFromAvroBean(field.schema(), field.schema().getType(), value); + values.add(value); + } else { + LOG.debug("Ignoring field {}, Since field couldn't find in the {} mapping", new Object[]{field.name(), cassandraMapping.getPersistentClass()}); + } + } + } else { + LOG.error("Key bean isn't extended by {} .", new Object[]{cassandraMapping.getKeyClass(), PersistentBase.class}); + } + } else { + for (Field field : cassandraMapping.getInlinedDefinedPartitionKeys()) { + keys.add(field.getFieldName()); + values.add(key); + } + } + } else { + keys.add(cassandraMapping.getDefaultCassandraKey().getFieldName()); + values.add(key.toString()); + } + } + + /** + * For every field within an object, we pass in a field schema, Type and value. + * This enables us to process fields (based on their characteristics) + * preparing them for persistence. + * + * @param fieldSchema the associated field schema + * @param type the field type + * @param fieldValue the field value. + * @return field value + */ + static Object getFieldValueFromAvroBean(Schema fieldSchema, Schema.Type type, Object fieldValue) { + switch (type) { + case RECORD: + PersistentBase persistent = (PersistentBase) fieldValue; + PersistentBase newRecord = (PersistentBase) SpecificData.get().newRecord(persistent, persistent.getSchema()); + for (Schema.Field member : fieldSchema.getFields()) { + if (member.pos() == 0 || !persistent.isDirty()) { + continue; + } + Schema memberSchema = member.schema(); + Schema.Type memberType = memberSchema.getType(); + Object memberValue = persistent.get(member.pos()); + newRecord.put(member.pos(), getFieldValueFromAvroBean(memberSchema, memberType, memberValue)); + } + fieldValue = newRecord; + break; + case MAP: + Schema valueSchema = fieldSchema.getValueType(); + Schema.Type valuetype = valueSchema.getType(); + HashMap<String, Object> map = new HashMap<>(); + for (Map.Entry<CharSequence, ?> e : ((Map<CharSequence, ?>) fieldValue).entrySet()) { + String mapKey = e.getKey().toString(); + Object mapValue = e.getValue(); + mapValue = getFieldValueFromAvroBean(valueSchema, valuetype, mapValue); + map.put(mapKey, mapValue); + } + fieldValue = map; + break; + case ARRAY: + valueSchema = fieldSchema.getElementType(); + valuetype = valueSchema.getType(); + ArrayList<Object> list = new ArrayList<>(); + for (Object item : (Collection<?>) fieldValue) { + Object value = getFieldValueFromAvroBean(valueSchema, valuetype, item); + list.add(value); + } + fieldValue = list; + break; + case UNION: + // storing the union selected schema, the actual value will + // be stored as soon as we get break out. + if (fieldValue != null) { + int schemaPos = getUnionSchema(fieldValue, fieldSchema); + Schema unionSchema = fieldSchema.getTypes().get(schemaPos); + Schema.Type unionType = unionSchema.getType(); + fieldValue = getFieldValueFromAvroBean(unionSchema, unionType, fieldValue); + } + break; + case STRING: + fieldValue = fieldValue.toString(); + break; + default: + break; + } + return fieldValue; + } + + /** + * Given an object and the object schema this function obtains, + * from within the UNION schema, the position of the type used. + * If no data type can be inferred then we return a default value + * of position 0. + * + * @param pValue Object + * @param pUnionSchema avro Schema + * @return the unionSchemaPosition. + */ + private static int getUnionSchema(Object pValue, Schema pUnionSchema) { + int unionSchemaPos = 0; +// String valueType = pValue.getClass().getSimpleName(); + for (Schema currentSchema : pUnionSchema.getTypes()) { + Schema.Type schemaType = currentSchema.getType(); + if (pValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) + return unionSchemaPos; + else if (pValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) + return unionSchemaPos; + else if (pValue instanceof Integer && schemaType.equals(Schema.Type.INT)) + return unionSchemaPos; + else if (pValue instanceof Long && schemaType.equals(Schema.Type.LONG)) + return unionSchemaPos; + else if (pValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) + return unionSchemaPos; + else if (pValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) + return unionSchemaPos; + else if (pValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) + return unionSchemaPos; + else if (pValue instanceof Map && schemaType.equals(Schema.Type.MAP)) + return unionSchemaPos; + else if (pValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) + return unionSchemaPos; + else if (pValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) + return unionSchemaPos; + unionSchemaPos++; + } + // if we weren't able to determine which data type it is, then we return the default + return DEFAULT_UNION_SCHEMA; + } + + static String encodeFieldKey(final String key) { + if (key == null) { + return null; + } + return key.replace(".", "\u00B7") + .replace(":", "\u00FF") + .replace(";", "\u00FE") + .replace(" ", "\u00FD") + .replace("%", "\u00FC") + .replace("=", "\u00FB"); + } + + static String decodeFieldKey(final String key) { --- End diff -- This method doesn't have any usages. > Migrate CassandraThrift to CQL > ------------------------------- > > Key: GORA-497 > URL: https://issues.apache.org/jira/browse/GORA-497 > Project: Apache Gora > Issue Type: Improvement > Components: gora-cassandra > Reporter: Madhawa Gunasekara > Assignee: Madhawa Gunasekara > Labels: gsoc2017 > Fix For: 0.8 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)