Repository: gora Updated Branches: refs/heads/master 560704c3a -> 4bbf52ee7
Switched Accumulo Dependency to 1.7.1 and ported AccumuloStore Class to work with accumulo 1.7.1 Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/ce945da3 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/ce945da3 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/ce945da3 Branch: refs/heads/master Commit: ce945da33c42463567c40a2d35fc34a0e23618d6 Parents: 560704c Author: Vaibhav Thapliyal <vaibhav.thapliyal...@gmail.com> Authored: Fri Feb 17 11:39:15 2017 +0530 Committer: vaibhavthapliyal <vaibhav.thapliyal...@gmail.com> Committed: Fri Feb 17 11:39:15 2017 +0530 ---------------------------------------------------------------------- gora-accumulo/pom.xml | 2 +- .../gora/accumulo/store/AccumuloStore.java | 1886 +++++++++--------- 2 files changed, 956 insertions(+), 932 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/ce945da3/gora-accumulo/pom.xml ---------------------------------------------------------------------- diff --git a/gora-accumulo/pom.xml b/gora-accumulo/pom.xml index bc131fa..e13f7a7 100644 --- a/gora-accumulo/pom.xml +++ b/gora-accumulo/pom.xml @@ -50,7 +50,7 @@ </ciManagement> <properties> - <accumulo.version>1.6.4</accumulo.version> + <accumulo.version>1.7.1</accumulo.version> <osgi.import>*</osgi.import> <osgi.export>org.apache.gora.accumulo*;version="${project.version}";-noimport:=true</osgi.export> </properties> http://git-wip-us.apache.org/repos/asf/gora/blob/ce945da3/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java ---------------------------------------------------------------------- diff --git a/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java b/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java index a68cdaa..a4cddce 100644 --- a/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java +++ b/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java @@ -50,25 +50,27 @@ import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.impl.ClientContext; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.TabletLocator; import org.apache.accumulo.core.client.mock.MockConnector; import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.mock.MockTabletLocator; +import org.apache.accumulo.core.client.mock.impl.MockTabletLocator; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.iterators.SortedKeyIterator; import org.apache.accumulo.core.iterators.user.TimestampFilter; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.client.impl.Credentials; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.avro.Schema; @@ -108,933 +110,955 @@ import org.w3c.dom.NodeList; /** * Implementation of a Accumulo data store to be used by gora. * - * @param <K> class to be used for the key - * @param <T> class to be persisted within the store + * @param <K> + * class to be used for the key + * @param <T> + * class to be persisted within the store */ -public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T> { - - protected static final String MOCK_PROPERTY = "accumulo.mock"; - protected static final String INSTANCE_NAME_PROPERTY = "accumulo.instance"; - protected static final String ZOOKEEPERS_NAME_PROPERTY = "accumulo.zookeepers"; - protected static final String USERNAME_PROPERTY = "accumulo.user"; - protected static final String PASSWORD_PROPERTY = "accumulo.password"; - protected static final String DEFAULT_MAPPING_FILE = "gora-accumulo-mapping.xml"; - - private final static String UNKOWN = "Unknown type "; - - private Connector conn; - private BatchWriter batchWriter; - private AccumuloMapping mapping; - private Credentials credentials; - private Encoder encoder; - - public static final Logger LOG = LoggerFactory.getLogger(AccumuloStore.class); - - public Object fromBytes(Schema schema, byte[] data) throws IOException { - Schema fromSchema = null; - if (schema.getType() == Type.UNION) { - try { - Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); - int unionIndex = decoder.readIndex(); - List<Schema> possibleTypes = schema.getTypes(); - fromSchema = possibleTypes.get(unionIndex); - Schema effectiveSchema = possibleTypes.get(unionIndex); - if (effectiveSchema.getType() == Type.NULL) { - decoder.readNull(); - return null; - } else { - data = decoder.readBytes(null).array(); - } - } catch (IOException e) { - LOG.error(e.getMessage()); - throw new GoraException("Error decoding union type: ", e); - } - } else { - fromSchema = schema; - } - return fromBytes(encoder, fromSchema, data); - } - - public static Object fromBytes(Encoder encoder, Schema schema, byte data[]) throws IOException { - switch (schema.getType()) { - case BOOLEAN: - return encoder.decodeBoolean(data); - case DOUBLE: - return encoder.decodeDouble(data); - case FLOAT: - return encoder.decodeFloat(data); - case INT: - return encoder.decodeInt(data); - case LONG: - return encoder.decodeLong(data); - case STRING: - return new Utf8(data); - case BYTES: - return ByteBuffer.wrap(data); - case ENUM: - return AvroUtils.getEnumValue(schema, encoder.decodeInt(data)); - case ARRAY: - break; - case FIXED: - break; - case MAP: - break; - case NULL: - break; - case RECORD: - break; - case UNION: - break; - default: - break; - } - throw new IllegalArgumentException(UNKOWN + schema.getType()); - - } - - private static byte[] getBytes(Text text) { - byte[] bytes = text.getBytes(); - if (bytes.length != text.getLength()) { - bytes = new byte[text.getLength()]; - System.arraycopy(text.getBytes(), 0, bytes, 0, bytes.length); - } - return bytes; - } - - public K fromBytes(Class<K> clazz, byte[] val) { - return fromBytes(encoder, clazz, val); - } - - @SuppressWarnings("unchecked") - public static <K> K fromBytes(Encoder encoder, Class<K> clazz, byte[] val) { - try { - if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { - return (K) Byte.valueOf(encoder.decodeByte(val)); - } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) { - return (K) Boolean.valueOf(encoder.decodeBoolean(val)); - } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) { - return (K) Short.valueOf(encoder.decodeShort(val)); - } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) { - return (K) Integer.valueOf(encoder.decodeInt(val)); - } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) { - return (K) Long.valueOf(encoder.decodeLong(val)); - } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) { - return (K) Float.valueOf(encoder.decodeFloat(val)); - } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) { - return (K) Double.valueOf(encoder.decodeDouble(val)); - } else if (clazz.equals(String.class)) { - return (K) new String(val, "UTF-8"); - } else if (clazz.equals(Utf8.class)) { - return (K) new Utf8(val); - } - - throw new IllegalArgumentException(UNKOWN + clazz.getName()); - } catch (IOException ioe) { - LOG.error(ioe.getMessage()); - throw new RuntimeException(ioe); - } - } - - private static byte[] copyIfNeeded(byte b[], int offset, int len) { - if (len != b.length || offset != 0) { - byte[] copy = new byte[len]; - System.arraycopy(b, offset, copy, 0, copy.length); - b = copy; - } - return b; - } - - public byte[] toBytes(Schema toSchema, Object o) { - if (toSchema != null && toSchema.getType() == Type.UNION) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - org.apache.avro.io.BinaryEncoder avroEncoder = EncoderFactory.get().binaryEncoder(baos, null); - int unionIndex = 0; - try { - if (o == null) { - unionIndex = firstNullSchemaTypeIndex(toSchema); - avroEncoder.writeIndex(unionIndex); - avroEncoder.writeNull(); - } else { - unionIndex = firstNotNullSchemaTypeIndex(toSchema); - avroEncoder.writeIndex(unionIndex); - avroEncoder.writeBytes(toBytes(o)); - } - avroEncoder.flush(); - return baos.toByteArray(); - } catch (IOException e) { - LOG.error(e.getMessage()); - return toBytes(o); - } - } else { - return toBytes(o); - } - } - - private int firstNullSchemaTypeIndex(Schema toSchema) { - List<Schema> possibleTypes = toSchema.getTypes(); - int unionIndex = 0; - for (int i = 0; i < possibleTypes.size(); i++ ) { - Type pType = possibleTypes.get(i).getType(); - if (pType == Type.NULL) { // FIXME HUGE kludge to pass tests - unionIndex = i; break; - } - } - return unionIndex; - } - - private int firstNotNullSchemaTypeIndex(Schema toSchema) { - List<Schema> possibleTypes = toSchema.getTypes(); - int unionIndex = 0; - for (int i = 0; i < possibleTypes.size(); i++ ) { - Type pType = possibleTypes.get(i).getType(); - if (pType != Type.NULL) { // FIXME HUGE kludge to pass tests - unionIndex = i; break; - } - } - return unionIndex; - } - - public byte[] toBytes(Object o) { - return toBytes(encoder, o); - } - - public static byte[] toBytes(Encoder encoder, Object o) { - - try { - if (o instanceof String) { - return ((String) o).getBytes("UTF-8"); - } else if (o instanceof Utf8) { - return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) o).getByteLength()); - } else if (o instanceof ByteBuffer) { - return copyIfNeeded(((ByteBuffer) o).array(), ((ByteBuffer) o).arrayOffset() + ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining()); - } else if (o instanceof Long) { - return encoder.encodeLong((Long) o); - } else if (o instanceof Integer) { - return encoder.encodeInt((Integer) o); - } else if (o instanceof Short) { - return encoder.encodeShort((Short) o); - } else if (o instanceof Byte) { - return encoder.encodeByte((Byte) o); - } else if (o instanceof Boolean) { - return encoder.encodeBoolean((Boolean) o); - } else if (o instanceof Float) { - return encoder.encodeFloat((Float) o); - } else if (o instanceof Double) { - return encoder.encodeDouble((Double) o); - } else if (o instanceof Enum) { - return encoder.encodeInt(((Enum<?>) o).ordinal()); - } - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - - throw new IllegalArgumentException(UNKOWN + o.getClass().getName()); - } - - private BatchWriter getBatchWriter() throws IOException { - if (batchWriter == null) - try { - BatchWriterConfig batchWriterConfig = new BatchWriterConfig(); - batchWriterConfig.setMaxMemory(10000000); - batchWriterConfig.setMaxLatency(60000L, TimeUnit.MILLISECONDS); - batchWriterConfig.setMaxWriteThreads(4); - batchWriter = conn.createBatchWriter(mapping.tableName, batchWriterConfig); - } catch (TableNotFoundException e) { - throw new IOException(e); - } - return batchWriter; - } - - /** - * Initialize the data store by reading the credentials, setting the client's properties up and - * reading the mapping file. Initialize is called when then the call to - * {@link org.apache.gora.store.DataStoreFactory#createDataStore} is made. - * - * @param keyClass - * @param persistentClass - * @param properties - */ - @Override - public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { - try{ - super.initialize(keyClass, persistentClass, properties); - - String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null); - String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE); - String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null); - String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null); - - mapping = readMapping(mappingFile); - - if (mapping.encoder == null || "".equals(mapping.encoder)) { - encoder = new BinaryEncoder(); - } else { - try { - encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance(); - } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { - throw new IOException(e); - } - } - - try { - AuthenticationToken token = new PasswordToken(password); - if (mock == null || !mock.equals("true")) { - String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null); - String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null); - conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, token); - } else { - conn = new MockInstance().getConnector(user, token); - } - credentials = new Credentials(user, token); - - if (autoCreateSchema && !schemaExists()) - createSchema(); - } catch (AccumuloException | AccumuloSecurityException e) { - throw new IOException(e); - } - } catch(IOException e){ - LOG.error(e.getMessage(), e); - } - } - - protected AccumuloMapping readMapping(String filename) throws IOException { - try { - - AccumuloMapping mapping = new AccumuloMapping(); - - DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder(); - Document dom = db.parse(getClass().getClassLoader().getResourceAsStream(filename)); - - Element root = dom.getDocumentElement(); - - NodeList nl = root.getElementsByTagName("class"); - for (int i = 0; i < nl.getLength(); i++) { - - Element classElement = (Element) nl.item(i); - if (classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName()) - && classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) { - - mapping.tableName = getSchemaName(classElement.getAttribute("table"), persistentClass); - mapping.encoder = classElement.getAttribute("encoder"); - - NodeList fields = classElement.getElementsByTagName("field"); - for (int j = 0; j < fields.getLength(); j++) { - Element fieldElement = (Element) fields.item(j); - - String name = fieldElement.getAttribute("name"); - String family = fieldElement.getAttribute("family"); - String qualifier = fieldElement.getAttribute("qualifier"); - if ("".equals(qualifier)) - qualifier = null; - - Pair<Text,Text> col = new Pair<>(new Text(family), qualifier == null ? null : new Text(qualifier)); - mapping.fieldMap.put(name, col); - mapping.columnMap.put(col, name); - } - } - - } - - if (mapping.tableName == null) { - throw new GoraException("Please define the accumulo 'table' name mapping in " + filename + " for " + persistentClass.getCanonicalName()); - } - - nl = root.getElementsByTagName("table"); - for (int i = 0; i < nl.getLength(); i++) { - Element tableElement = (Element) nl.item(i); - if (tableElement.getAttribute("name").equals(mapping.tableName)) { - NodeList configs = tableElement.getElementsByTagName("config"); - for (int j = 0; j < configs.getLength(); j++) { - Element configElement = (Element) configs.item(j); - String key = configElement.getAttribute("key"); - String val = configElement.getAttribute("value"); - mapping.tableConfig.put(key, val); - } - } - } - - return mapping; - } catch (Exception ex) { - throw new IOException("Unable to read " + filename, ex); - } - - } - - @Override - public String getSchemaName() { - return mapping.tableName; - } - - @Override - public void createSchema() { - try { - conn.tableOperations().create(mapping.tableName); - Set<Entry<String,String>> es = mapping.tableConfig.entrySet(); - for (Entry<String,String> entry : es) { - conn.tableOperations().setProperty(mapping.tableName, entry.getKey(), entry.getValue()); - } - - } catch (AccumuloException | AccumuloSecurityException e) { - LOG.error(e.getMessage(), e); - } catch (TableExistsException e) { - LOG.debug(e.getMessage(), e); - } - } - - @Override - public void deleteSchema() { - try { - if (batchWriter != null) - batchWriter.close(); - batchWriter = null; - conn.tableOperations().delete(mapping.tableName); - } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { - LOG.error(e.getMessage(), e); - } - } - - @Override - public boolean schemaExists() { - return conn.tableOperations().exists(mapping.tableName); - } - - public ByteSequence populate(Iterator<Entry<Key,Value>> iter, T persistent) throws IOException { - ByteSequence row = null; - - Map<Utf8, Object> currentMap = null; - List currentArray = null; - Text currentFam = null; - int currentPos = 0; - Schema currentSchema = null; - Field currentField = null; - - BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new byte[0], null); - - while (iter.hasNext()) { - Entry<Key,Value> entry = iter.next(); - - if (row == null) { - row = entry.getKey().getRowData(); - } - byte[] val = entry.getValue().get(); - - Field field = fieldMap.get(getFieldName(entry)); - - if (currentMap != null) { - if (currentFam.equals(entry.getKey().getColumnFamily())) { - currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), - fromBytes(currentSchema, entry.getValue().get())); - continue; - } else { - persistent.put(currentPos, currentMap); - currentMap = null; - } - } else if (currentArray != null) { - if (currentFam.equals(entry.getKey().getColumnFamily())) { - currentArray.add(fromBytes(currentSchema, entry.getValue().get())); - continue; - } else { - persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray)); - currentArray = null; - } - } - - switch (field.schema().getType()) { - case MAP: // first entry only. Next are handled above on the next loop - currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>()); - currentPos = field.pos(); - currentFam = entry.getKey().getColumnFamily(); - currentSchema = field.schema().getValueType(); - - currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), - fromBytes(currentSchema, entry.getValue().get())); - break; - case ARRAY: - currentArray = new DirtyListWrapper<>(new ArrayList<>()); - currentPos = field.pos(); - currentFam = entry.getKey().getColumnFamily(); - currentSchema = field.schema().getElementType(); - currentField = field; - - currentArray.add(fromBytes(currentSchema, entry.getValue().get())); - - break; - case UNION:// default value of null acts like union with null - Schema effectiveSchema = field.schema().getTypes() - .get(firstNotNullSchemaTypeIndex(field.schema())); - // map and array were coded without union index so need to be read the same way - if (effectiveSchema.getType() == Type.ARRAY) { - currentArray = new DirtyListWrapper<>(new ArrayList<>()); - currentPos = field.pos(); - currentFam = entry.getKey().getColumnFamily(); - currentSchema = field.schema().getElementType(); - currentField = field; - - currentArray.add(fromBytes(currentSchema, entry.getValue().get())); - break; - } - else if (effectiveSchema.getType() == Type.MAP) { - currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>()); - currentPos = field.pos(); - currentFam = entry.getKey().getColumnFamily(); - currentSchema = effectiveSchema.getValueType(); - - currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), - fromBytes(currentSchema, entry.getValue().get())); - break; - } - // continue like a regular top-level union - case RECORD: - SpecificDatumReader<?> reader = new SpecificDatumReader<Schema>(field.schema()); - persistent.put(field.pos(), reader.read(null, DecoderFactory.get().binaryDecoder(val, decoder))); - break; - default: - persistent.put(field.pos(), fromBytes(field.schema(), entry.getValue().get())); - } - } - - if (currentMap != null) { - persistent.put(currentPos, currentMap); - } else if (currentArray != null) { - persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray)); - } - - persistent.clearDirty(); - - return row; - } - - /** - * Retrieve field name from entry. - * @param entry The Key-Value entry - * @return String The field name - */ - private String getFieldName(Entry<Key, Value> entry) { - String fieldName = mapping.columnMap.get(new Pair<>(entry.getKey().getColumnFamily(), - entry.getKey().getColumnQualifier())); - if (fieldName == null) { - fieldName = mapping.columnMap.get(new Pair<Text,Text>(entry.getKey().getColumnFamily(), null)); - } - return fieldName; - } - - private void setFetchColumns(Scanner scanner, String[] fields) { - fields = getFieldsToQuery(fields); - for (String field : fields) { - Pair<Text,Text> col = mapping.fieldMap.get(field); - if (col != null) { - if (col.getSecond() == null) { - scanner.fetchColumnFamily(col.getFirst()); - } else { - scanner.fetchColumn(col.getFirst(), col.getSecond()); - } - } else { - LOG.error("Mapping not found for field: {}", field); - } - } - } - - @Override - public T get(K key, String[] fields) { - try { - // TODO make isolated scanner optional? - Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY)); - Range rowRange = new Range(new Text(toBytes(key))); - - scanner.setRange(rowRange); - setFetchColumns(scanner, fields); - - T persistent = newPersistent(); - ByteSequence row = populate(scanner.iterator(), persistent); - if (row == null) - return null; - return persistent; - } catch (TableNotFoundException e) { - LOG.error(e.getMessage(), e); - return null; - } catch (IOException e) { - LOG.error(e.getMessage(), e); - return null; - } - } - - @Override - public void put(K key, T val) { - - try{ - Mutation m = new Mutation(new Text(toBytes(key))); - - Schema schema = val.getSchema(); - List<Field> fields = schema.getFields(); - int count = 0; - - for (int i = 0; i < fields.size(); i++) { - if (!val.isDirty(i)) { - continue; - } - Field field = fields.get(i); - - Object o = val.get(field.pos()); - - Pair<Text,Text> col = mapping.fieldMap.get(field.name()); - - if (col == null) { - throw new GoraException("Please define the gora to accumulo mapping for field " + field.name()); - } - - switch (field.schema().getType()) { - case MAP: - count = putMap(m, count, field.schema().getValueType(), o, col, field.name()); - break; - case ARRAY: - count = putArray(m, count, o, col, field.name()); - break; - case UNION: // default value of null acts like union with null - Schema effectiveSchema = field.schema().getTypes() - .get(firstNotNullSchemaTypeIndex(field.schema())); - // map and array need to compute qualifier - if (effectiveSchema.getType() == Type.ARRAY) { - count = putArray(m, count, o, col, field.name()); - break; - } - else if (effectiveSchema.getType() == Type.MAP) { - count = putMap(m, count, effectiveSchema.getValueType(), o, col, field.name()); - break; - } - // continue like a regular top-level union - case RECORD: - final SpecificDatumWriter<Object> writer = new SpecificDatumWriter<>(field.schema()); - final byte[] byteData = IOUtils.serialize(writer,o); - m.put(col.getFirst(), col.getSecond(), new Value(byteData)); - count++; - break; - default: - m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o))); - count++; - } - - } - - if (count > 0) - try { - getBatchWriter().addMutation(m); - } catch (MutationsRejectedException e) { - LOG.error(e.getMessage(), e); - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } - } - - private int putMap(Mutation m, int count, Schema valueType, Object o, Pair<Text, Text> col, String fieldName) throws GoraException { - - // First of all we delete map field on accumulo store - Text rowKey = new Text(m.getRow()); - Query<K, T> query = newQuery(); - query.setFields(fieldName); - query.setStartKey((K)rowKey.toString()); - query.setEndKey((K)rowKey.toString()); - deleteByQuery(query); - flush(); - if (o == null){ - return 0; - } - - Set<?> es = ((Map<?, ?>)o).entrySet(); - for (Object entry : es) { - Object mapKey = ((Entry<?, ?>) entry).getKey(); - Object mapVal = ((Entry<?, ?>) entry).getValue(); - if ((o instanceof DirtyMapWrapper && ((DirtyMapWrapper<?, ?>)o).isDirty()) - || !(o instanceof DirtyMapWrapper)) { - m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(valueType, mapVal))); - count++; - } - // TODO map value deletion - } - return count; - } - - private int putArray(Mutation m, int count, Object o, Pair<Text, Text> col, String fieldName) { - - // First of all we delete array field on accumulo store - Text rowKey = new Text(m.getRow()); - Query<K, T> query = newQuery(); - query.setFields(fieldName); - query.setStartKey((K)rowKey.toString()); - query.setEndKey((K)rowKey.toString()); - deleteByQuery(query); - flush(); - if (o == null){ - return 0; - } - - List<?> array = (List<?>) o; // both GenericArray and DirtyListWrapper - int j = 0; - for (Object item : array) { - m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item))); - count++; - } - return count; - } - - @Override - public boolean delete(K key) { - Query<K,T> q = newQuery(); - q.setKey(key); - return deleteByQuery(q) > 0; - } - - @Override - public long deleteByQuery(Query<K,T> query) { - try { - Scanner scanner = createScanner(query); - // add iterator that drops values on the server side - scanner.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, SortedKeyIterator.class)); - RowIterator iterator = new RowIterator(scanner.iterator()); - - long count = 0; - - while (iterator.hasNext()) { - Iterator<Entry<Key,Value>> row = iterator.next(); - Mutation m = null; - while (row.hasNext()) { - Entry<Key,Value> entry = row.next(); - Key key = entry.getKey(); - if (m == null) - m = new Mutation(key.getRow()); - // TODO optimize to avoid continually creating column vis? prob does not matter for empty - m.putDelete(key.getColumnFamily(), key.getColumnQualifier(), new ColumnVisibility(key.getColumnVisibility()), key.getTimestamp()); - } - getBatchWriter().addMutation(m); - count++; - } - - return count; - } catch (TableNotFoundException e) { - // TODO return 0? - LOG.error(e.getMessage(), e); - return 0; - } catch (MutationsRejectedException e) { - LOG.error(e.getMessage(), e); - return 0; - } catch (IOException e){ - LOG.error(e.getMessage(), e); - return 0; - } - } - - private Range createRange(Query<K,T> query) { - Text startRow = null; - Text endRow = null; - - if (query.getStartKey() != null) - startRow = new Text(toBytes(query.getStartKey())); - - if (query.getEndKey() != null) - endRow = new Text(toBytes(query.getEndKey())); - - return new Range(startRow, true, endRow, true); - - } - - private Scanner createScanner(Query<K,T> query) throws TableNotFoundException { - // TODO make isolated scanner optional? - Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY)); - setFetchColumns(scanner, query.getFields()); - - scanner.setRange(createRange(query)); - - if (query.getStartTime() != -1 || query.getEndTime() != -1) { - IteratorSetting is = new IteratorSetting(30, TimestampFilter.class); - if (query.getStartTime() != -1) - TimestampFilter.setStart(is, query.getStartTime(), true); - if (query.getEndTime() != -1) - TimestampFilter.setEnd(is, query.getEndTime(), true); - - scanner.addScanIterator(is); - } - - return scanner; - } - - /** - * Execute the query and return the result. - */ - @Override - public Result<K,T> execute(Query<K,T> query) { - try { - Scanner scanner = createScanner(query); - return new AccumuloResult<>(this, query, scanner); - } catch (TableNotFoundException e) { - // TODO return empty result? - LOG.error(e.getMessage(), e); - return null; - } - } - - @Override - public Query<K,T> newQuery() { - return new AccumuloQuery<>(this); - } - - Text pad(Text key, int bytes) { - if (key.getLength() < bytes) - key = new Text(key); - - while (key.getLength() < bytes) { - key.append(new byte[] {0}, 0, 1); - } - - return key; - } - - @Override - public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws IOException { - try { - TabletLocator tl; - if (conn instanceof MockConnector) - tl = new MockTabletLocator(); - else - tl = TabletLocator.getLocator(conn.getInstance(), new Text(Tables.getTableId(conn.getInstance(), mapping.tableName))); - - Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>(); - - tl.invalidateCache(); - while (tl.binRanges(credentials, Collections.singletonList(createRange(query)), binnedRanges).size() > 0) { - // TODO log? - if (!Tables.exists(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName))) - throw new TableDeletedException(Tables.getTableId(conn.getInstance(), mapping.tableName)); - else if (Tables.getTableState(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName)) == TableState.OFFLINE) - throw new TableOfflineException(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName)); - UtilWaitThread.sleep(100); - tl.invalidateCache(); - } - - List<PartitionQuery<K,T>> ret = new ArrayList<>(); - - Text startRow = null; - Text endRow = null; - if (query.getStartKey() != null) - startRow = new Text(toBytes(query.getStartKey())); - if (query.getEndKey() != null) - endRow = new Text(toBytes(query.getEndKey())); - - //hadoop expects hostnames, accumulo keeps track of IPs... so need to convert - HashMap<String,String> hostNameCache = new HashMap<>(); - - for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) { - String ip = entry.getKey().split(":", 2)[0]; - String location = hostNameCache.get(ip); - if (location == null) { - InetAddress inetAddress = InetAddress.getByName(ip); - location = inetAddress.getHostName(); - hostNameCache.put(ip, location); - } - - Map<KeyExtent,List<Range>> tablets = entry.getValue(); - for (KeyExtent ke : tablets.keySet()) { - - K startKey = null; - if (startRow == null || !ke.contains(startRow)) { - if (ke.getPrevEndRow() != null) { - startKey = followingKey(encoder, getKeyClass(), getBytes(ke.getPrevEndRow())); - } - } else { - startKey = fromBytes(getKeyClass(), getBytes(startRow)); - } - - K endKey = null; - if (endRow == null || !ke.contains(endRow)) { - if (ke.getEndRow() != null) - endKey = lastPossibleKey(encoder, getKeyClass(), getBytes(ke.getEndRow())); - } else { - endKey = fromBytes(getKeyClass(), getBytes(endRow)); - } - - PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<>(query, startKey, endKey, location); - pqi.setConf(getConf()); - ret.add(pqi); - } - } - - return ret; - } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) { - throw new IOException(e); - } - - } - - static <K> K lastPossibleKey(Encoder encoder, Class<K> clazz, byte[] er) { - - if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { - throw new UnsupportedOperationException(); - } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) { - throw new UnsupportedOperationException(); - } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) { - return fromBytes(encoder, clazz, encoder.lastPossibleKey(2, er)); - } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) { - return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er)); - } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) { - return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er)); - } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) { - return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er)); - } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) { - return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er)); - } else if (clazz.equals(String.class)) { - throw new UnsupportedOperationException(); - } else if (clazz.equals(Utf8.class)) { - return fromBytes(encoder, clazz, er); - } - - throw new IllegalArgumentException(UNKOWN + clazz.getName()); - } - - @SuppressWarnings("unchecked") - static <K> K followingKey(Encoder encoder, Class<K> clazz, byte[] per) { - - if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { - return (K) Byte.valueOf(encoder.followingKey(1, per)[0]); - } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) { - throw new UnsupportedOperationException(); - } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) { - return fromBytes(encoder, clazz, encoder.followingKey(2, per)); - } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) { - return fromBytes(encoder, clazz, encoder.followingKey(4, per)); - } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) { - return fromBytes(encoder, clazz, encoder.followingKey(8, per)); - } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) { - return fromBytes(encoder, clazz, encoder.followingKey(4, per)); - } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) { - return fromBytes(encoder, clazz, encoder.followingKey(8, per)); - } else if (clazz.equals(String.class)) { - throw new UnsupportedOperationException(); - } else if (clazz.equals(Utf8.class)) { - return fromBytes(encoder, clazz, Arrays.copyOf(per, per.length + 1)); - } - - throw new IllegalArgumentException(UNKOWN + clazz.getName()); - } - - @Override - public void flush() { - try { - if (batchWriter != null) { - batchWriter.flush(); - } - } catch (MutationsRejectedException e) { - LOG.error(e.getMessage(), e); - } - } - - @Override - public void close() { - try { - if (batchWriter != null) { - batchWriter.close(); - batchWriter = null; - } - } catch (MutationsRejectedException e) { - LOG.error(e.getMessage(), e); - } - } +public class AccumuloStore<K, T extends PersistentBase> extends DataStoreBase<K, T> { + + protected static final String MOCK_PROPERTY = "accumulo.mock"; + protected static final String INSTANCE_NAME_PROPERTY = "accumulo.instance"; + protected static final String ZOOKEEPERS_NAME_PROPERTY = "accumulo.zookeepers"; + protected static final String USERNAME_PROPERTY = "accumulo.user"; + protected static final String PASSWORD_PROPERTY = "accumulo.password"; + protected static final String DEFAULT_MAPPING_FILE = "gora-accumulo-mapping.xml"; + + private final static String UNKOWN = "Unknown type "; + + private Connector conn; + private BatchWriter batchWriter; + private AccumuloMapping mapping; + private Credentials credentials; + private Encoder encoder; + + public static final Logger LOG = LoggerFactory.getLogger(AccumuloStore.class); + + public Object fromBytes(Schema schema, byte[] data) throws IOException { + Schema fromSchema = null; + if (schema.getType() == Type.UNION) { + try { + Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); + int unionIndex = decoder.readIndex(); + List<Schema> possibleTypes = schema.getTypes(); + fromSchema = possibleTypes.get(unionIndex); + Schema effectiveSchema = possibleTypes.get(unionIndex); + if (effectiveSchema.getType() == Type.NULL) { + decoder.readNull(); + return null; + } else { + data = decoder.readBytes(null).array(); + } + } catch (IOException e) { + LOG.error(e.getMessage()); + throw new GoraException("Error decoding union type: ", e); + } + } else { + fromSchema = schema; + } + return fromBytes(encoder, fromSchema, data); + } + + public static Object fromBytes(Encoder encoder, Schema schema, byte data[]) throws IOException { + switch (schema.getType()) { + case BOOLEAN: + return encoder.decodeBoolean(data); + case DOUBLE: + return encoder.decodeDouble(data); + case FLOAT: + return encoder.decodeFloat(data); + case INT: + return encoder.decodeInt(data); + case LONG: + return encoder.decodeLong(data); + case STRING: + return new Utf8(data); + case BYTES: + return ByteBuffer.wrap(data); + case ENUM: + return AvroUtils.getEnumValue(schema, encoder.decodeInt(data)); + case ARRAY: + break; + case FIXED: + break; + case MAP: + break; + case NULL: + break; + case RECORD: + break; + case UNION: + break; + default: + break; + } + throw new IllegalArgumentException(UNKOWN + schema.getType()); + + } + + private static byte[] getBytes(Text text) { + byte[] bytes = text.getBytes(); + if (bytes.length != text.getLength()) { + bytes = new byte[text.getLength()]; + System.arraycopy(text.getBytes(), 0, bytes, 0, bytes.length); + } + return bytes; + } + + public K fromBytes(Class<K> clazz, byte[] val) { + return fromBytes(encoder, clazz, val); + } + + @SuppressWarnings("unchecked") + public static <K> K fromBytes(Encoder encoder, Class<K> clazz, byte[] val) { + try { + if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { + return (K) Byte.valueOf(encoder.decodeByte(val)); + } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) { + return (K) Boolean.valueOf(encoder.decodeBoolean(val)); + } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) { + return (K) Short.valueOf(encoder.decodeShort(val)); + } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) { + return (K) Integer.valueOf(encoder.decodeInt(val)); + } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) { + return (K) Long.valueOf(encoder.decodeLong(val)); + } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) { + return (K) Float.valueOf(encoder.decodeFloat(val)); + } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) { + return (K) Double.valueOf(encoder.decodeDouble(val)); + } else if (clazz.equals(String.class)) { + return (K) new String(val, "UTF-8"); + } else if (clazz.equals(Utf8.class)) { + return (K) new Utf8(val); + } + + throw new IllegalArgumentException(UNKOWN + clazz.getName()); + } catch (IOException ioe) { + LOG.error(ioe.getMessage()); + throw new RuntimeException(ioe); + } + } + + private static byte[] copyIfNeeded(byte b[], int offset, int len) { + if (len != b.length || offset != 0) { + byte[] copy = new byte[len]; + System.arraycopy(b, offset, copy, 0, copy.length); + b = copy; + } + return b; + } + + public byte[] toBytes(Schema toSchema, Object o) { + if (toSchema != null && toSchema.getType() == Type.UNION) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + org.apache.avro.io.BinaryEncoder avroEncoder = EncoderFactory.get().binaryEncoder(baos, null); + int unionIndex = 0; + try { + if (o == null) { + unionIndex = firstNullSchemaTypeIndex(toSchema); + avroEncoder.writeIndex(unionIndex); + avroEncoder.writeNull(); + } else { + unionIndex = firstNotNullSchemaTypeIndex(toSchema); + avroEncoder.writeIndex(unionIndex); + avroEncoder.writeBytes(toBytes(o)); + } + avroEncoder.flush(); + return baos.toByteArray(); + } catch (IOException e) { + LOG.error(e.getMessage()); + return toBytes(o); + } + } else { + return toBytes(o); + } + } + + private int firstNullSchemaTypeIndex(Schema toSchema) { + List<Schema> possibleTypes = toSchema.getTypes(); + int unionIndex = 0; + for (int i = 0; i < possibleTypes.size(); i++) { + Type pType = possibleTypes.get(i).getType(); + if (pType == Type.NULL) { // FIXME HUGE kludge to pass tests + unionIndex = i; + break; + } + } + return unionIndex; + } + + private int firstNotNullSchemaTypeIndex(Schema toSchema) { + List<Schema> possibleTypes = toSchema.getTypes(); + int unionIndex = 0; + for (int i = 0; i < possibleTypes.size(); i++) { + Type pType = possibleTypes.get(i).getType(); + if (pType != Type.NULL) { // FIXME HUGE kludge to pass tests + unionIndex = i; + break; + } + } + return unionIndex; + } + + public byte[] toBytes(Object o) { + return toBytes(encoder, o); + } + + public static byte[] toBytes(Encoder encoder, Object o) { + + try { + if (o instanceof String) { + return ((String) o).getBytes("UTF-8"); + } else if (o instanceof Utf8) { + return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) o).getByteLength()); + } else if (o instanceof ByteBuffer) { + return copyIfNeeded(((ByteBuffer) o).array(), + ((ByteBuffer) o).arrayOffset() + ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining()); + } else if (o instanceof Long) { + return encoder.encodeLong((Long) o); + } else if (o instanceof Integer) { + return encoder.encodeInt((Integer) o); + } else if (o instanceof Short) { + return encoder.encodeShort((Short) o); + } else if (o instanceof Byte) { + return encoder.encodeByte((Byte) o); + } else if (o instanceof Boolean) { + return encoder.encodeBoolean((Boolean) o); + } else if (o instanceof Float) { + return encoder.encodeFloat((Float) o); + } else if (o instanceof Double) { + return encoder.encodeDouble((Double) o); + } else if (o instanceof Enum) { + return encoder.encodeInt(((Enum<?>) o).ordinal()); + } + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + + throw new IllegalArgumentException(UNKOWN + o.getClass().getName()); + } + + private BatchWriter getBatchWriter() throws IOException { + if (batchWriter == null) + try { + BatchWriterConfig batchWriterConfig = new BatchWriterConfig(); + batchWriterConfig.setMaxMemory(10000000); + batchWriterConfig.setMaxLatency(60000L, TimeUnit.MILLISECONDS); + batchWriterConfig.setMaxWriteThreads(4); + batchWriter = conn.createBatchWriter(mapping.tableName, batchWriterConfig); + } catch (TableNotFoundException e) { + throw new IOException(e); + } + return batchWriter; + } + + /** + * Initialize the data store by reading the credentials, setting the + * client's properties up and reading the mapping file. Initialize is called + * when then the call to + * {@link org.apache.gora.store.DataStoreFactory#createDataStore} is made. + * + * @param keyClass + * @param persistentClass + * @param properties + */ + @Override + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { + try { + super.initialize(keyClass, persistentClass, properties); + + String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null); + String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE); + String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null); + String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null); + + mapping = readMapping(mappingFile); + + if (mapping.encoder == null || "".equals(mapping.encoder)) { + encoder = new BinaryEncoder(); + } else { + try { + encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + throw new IOException(e); + } + } + + try { + AuthenticationToken token = new PasswordToken(password); + if (mock == null || !mock.equals("true")) { + String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null); + String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null); + conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, token); + } else { + conn = new MockInstance().getConnector(user, token); + } + credentials = new Credentials(user, token); + + if (autoCreateSchema && !schemaExists()) + createSchema(); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new IOException(e); + } + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + } + + protected AccumuloMapping readMapping(String filename) throws IOException { + try { + + AccumuloMapping mapping = new AccumuloMapping(); + + DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder(); + Document dom = db.parse(getClass().getClassLoader().getResourceAsStream(filename)); + + Element root = dom.getDocumentElement(); + + NodeList nl = root.getElementsByTagName("class"); + for (int i = 0; i < nl.getLength(); i++) { + + Element classElement = (Element) nl.item(i); + if (classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName()) + && classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) { + + mapping.tableName = getSchemaName(classElement.getAttribute("table"), persistentClass); + mapping.encoder = classElement.getAttribute("encoder"); + + NodeList fields = classElement.getElementsByTagName("field"); + for (int j = 0; j < fields.getLength(); j++) { + Element fieldElement = (Element) fields.item(j); + + String name = fieldElement.getAttribute("name"); + String family = fieldElement.getAttribute("family"); + String qualifier = fieldElement.getAttribute("qualifier"); + if ("".equals(qualifier)) + qualifier = null; + + Pair<Text, Text> col = new Pair<>(new Text(family), + qualifier == null ? null : new Text(qualifier)); + mapping.fieldMap.put(name, col); + mapping.columnMap.put(col, name); + } + } + + } + + if (mapping.tableName == null) { + throw new GoraException("Please define the accumulo 'table' name mapping in " + filename + " for " + + persistentClass.getCanonicalName()); + } + + nl = root.getElementsByTagName("table"); + for (int i = 0; i < nl.getLength(); i++) { + Element tableElement = (Element) nl.item(i); + if (tableElement.getAttribute("name").equals(mapping.tableName)) { + NodeList configs = tableElement.getElementsByTagName("config"); + for (int j = 0; j < configs.getLength(); j++) { + Element configElement = (Element) configs.item(j); + String key = configElement.getAttribute("key"); + String val = configElement.getAttribute("value"); + mapping.tableConfig.put(key, val); + } + } + } + + return mapping; + } catch (Exception ex) { + throw new IOException("Unable to read " + filename, ex); + } + + } + + @Override + public String getSchemaName() { + return mapping.tableName; + } + + @Override + public void createSchema() { + try { + conn.tableOperations().create(mapping.tableName); + Set<Entry<String, String>> es = mapping.tableConfig.entrySet(); + for (Entry<String, String> entry : es) { + conn.tableOperations().setProperty(mapping.tableName, entry.getKey(), entry.getValue()); + } + + } catch (AccumuloException | AccumuloSecurityException e) { + LOG.error(e.getMessage(), e); + } catch (TableExistsException e) { + LOG.debug(e.getMessage(), e); + } + } + + @Override + public void deleteSchema() { + try { + if (batchWriter != null) + batchWriter.close(); + batchWriter = null; + conn.tableOperations().delete(mapping.tableName); + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + LOG.error(e.getMessage(), e); + } + } + + @Override + public boolean schemaExists() { + return conn.tableOperations().exists(mapping.tableName); + } + + public ByteSequence populate(Iterator<Entry<Key, Value>> iter, T persistent) throws IOException { + ByteSequence row = null; + + Map<Utf8, Object> currentMap = null; + List currentArray = null; + Text currentFam = null; + int currentPos = 0; + Schema currentSchema = null; + Field currentField = null; + + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new byte[0], null); + + while (iter.hasNext()) { + Entry<Key, Value> entry = iter.next(); + + if (row == null) { + row = entry.getKey().getRowData(); + } + byte[] val = entry.getValue().get(); + + Field field = fieldMap.get(getFieldName(entry)); + + if (currentMap != null) { + if (currentFam.equals(entry.getKey().getColumnFamily())) { + currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), + fromBytes(currentSchema, entry.getValue().get())); + continue; + } else { + persistent.put(currentPos, currentMap); + currentMap = null; + } + } else if (currentArray != null) { + if (currentFam.equals(entry.getKey().getColumnFamily())) { + currentArray.add(fromBytes(currentSchema, entry.getValue().get())); + continue; + } else { + persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray)); + currentArray = null; + } + } + + switch (field.schema().getType()) { + case MAP: // first entry only. Next are handled above on the next + // loop + currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>()); + currentPos = field.pos(); + currentFam = entry.getKey().getColumnFamily(); + currentSchema = field.schema().getValueType(); + + currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), + fromBytes(currentSchema, entry.getValue().get())); + break; + case ARRAY: + currentArray = new DirtyListWrapper<>(new ArrayList<>()); + currentPos = field.pos(); + currentFam = entry.getKey().getColumnFamily(); + currentSchema = field.schema().getElementType(); + currentField = field; + + currentArray.add(fromBytes(currentSchema, entry.getValue().get())); + + break; + case UNION:// default value of null acts like union with null + Schema effectiveSchema = field.schema().getTypes().get(firstNotNullSchemaTypeIndex(field.schema())); + // map and array were coded without union index so need to be + // read the same way + if (effectiveSchema.getType() == Type.ARRAY) { + currentArray = new DirtyListWrapper<>(new ArrayList<>()); + currentPos = field.pos(); + currentFam = entry.getKey().getColumnFamily(); + currentSchema = field.schema().getElementType(); + currentField = field; + + currentArray.add(fromBytes(currentSchema, entry.getValue().get())); + break; + } else if (effectiveSchema.getType() == Type.MAP) { + currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>()); + currentPos = field.pos(); + currentFam = entry.getKey().getColumnFamily(); + currentSchema = effectiveSchema.getValueType(); + + currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), + fromBytes(currentSchema, entry.getValue().get())); + break; + } + // continue like a regular top-level union + case RECORD: + SpecificDatumReader<?> reader = new SpecificDatumReader<Schema>(field.schema()); + persistent.put(field.pos(), reader.read(null, DecoderFactory.get().binaryDecoder(val, decoder))); + break; + default: + persistent.put(field.pos(), fromBytes(field.schema(), entry.getValue().get())); + } + } + + if (currentMap != null) { + persistent.put(currentPos, currentMap); + } else if (currentArray != null) { + persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray)); + } + + persistent.clearDirty(); + + return row; + } + + /** + * Retrieve field name from entry. + * + * @param entry + * The Key-Value entry + * @return String The field name + */ + private String getFieldName(Entry<Key, Value> entry) { + String fieldName = mapping.columnMap + .get(new Pair<>(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier())); + if (fieldName == null) { + fieldName = mapping.columnMap.get(new Pair<Text, Text>(entry.getKey().getColumnFamily(), null)); + } + return fieldName; + } + + private void setFetchColumns(Scanner scanner, String[] fields) { + fields = getFieldsToQuery(fields); + for (String field : fields) { + Pair<Text, Text> col = mapping.fieldMap.get(field); + if (col != null) { + if (col.getSecond() == null) { + scanner.fetchColumnFamily(col.getFirst()); + } else { + scanner.fetchColumn(col.getFirst(), col.getSecond()); + } + } else { + LOG.error("Mapping not found for field: {}", field); + } + } + } + + @Override + public T get(K key, String[] fields) { + try { + // TODO make isolated scanner optional? + Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY)); + Range rowRange = new Range(new Text(toBytes(key))); + + scanner.setRange(rowRange); + setFetchColumns(scanner, fields); + + T persistent = newPersistent(); + ByteSequence row = populate(scanner.iterator(), persistent); + if (row == null) + return null; + return persistent; + } catch (TableNotFoundException e) { + LOG.error(e.getMessage(), e); + return null; + } catch (IOException e) { + LOG.error(e.getMessage(), e); + return null; + } + } + + @Override + public void put(K key, T val) { + + try { + Mutation m = new Mutation(new Text(toBytes(key))); + + Schema schema = val.getSchema(); + List<Field> fields = schema.getFields(); + int count = 0; + + for (int i = 0; i < fields.size(); i++) { + if (!val.isDirty(i)) { + continue; + } + Field field = fields.get(i); + + Object o = val.get(field.pos()); + + Pair<Text, Text> col = mapping.fieldMap.get(field.name()); + + if (col == null) { + throw new GoraException("Please define the gora to accumulo mapping for field " + field.name()); + } + + switch (field.schema().getType()) { + case MAP: + count = putMap(m, count, field.schema().getValueType(), o, col, field.name()); + break; + case ARRAY: + count = putArray(m, count, o, col, field.name()); + break; + case UNION: // default value of null acts like union with null + Schema effectiveSchema = field.schema().getTypes().get(firstNotNullSchemaTypeIndex(field.schema())); + // map and array need to compute qualifier + if (effectiveSchema.getType() == Type.ARRAY) { + count = putArray(m, count, o, col, field.name()); + break; + } else if (effectiveSchema.getType() == Type.MAP) { + count = putMap(m, count, effectiveSchema.getValueType(), o, col, field.name()); + break; + } + // continue like a regular top-level union + case RECORD: + final SpecificDatumWriter<Object> writer = new SpecificDatumWriter<>(field.schema()); + final byte[] byteData = IOUtils.serialize(writer, o); + m.put(col.getFirst(), col.getSecond(), new Value(byteData)); + count++; + break; + default: + m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o))); + count++; + } + + } + + if (count > 0) + try { + getBatchWriter().addMutation(m); + } catch (MutationsRejectedException e) { + LOG.error(e.getMessage(), e); + } + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + } + + private int putMap(Mutation m, int count, Schema valueType, Object o, Pair<Text, Text> col, String fieldName) + throws GoraException { + + // First of all we delete map field on accumulo store + Text rowKey = new Text(m.getRow()); + Query<K, T> query = newQuery(); + query.setFields(fieldName); + query.setStartKey((K) rowKey.toString()); + query.setEndKey((K) rowKey.toString()); + deleteByQuery(query); + flush(); + if (o == null) { + return 0; + } + + Set<?> es = ((Map<?, ?>) o).entrySet(); + for (Object entry : es) { + Object mapKey = ((Entry<?, ?>) entry).getKey(); + Object mapVal = ((Entry<?, ?>) entry).getValue(); + if ((o instanceof DirtyMapWrapper && ((DirtyMapWrapper<?, ?>) o).isDirty()) + || !(o instanceof DirtyMapWrapper)) { + m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(valueType, mapVal))); + count++; + } + // TODO map value deletion + } + return count; + } + + private int putArray(Mutation m, int count, Object o, Pair<Text, Text> col, String fieldName) { + + // First of all we delete array field on accumulo store + Text rowKey = new Text(m.getRow()); + Query<K, T> query = newQuery(); + query.setFields(fieldName); + query.setStartKey((K) rowKey.toString()); + query.setEndKey((K) rowKey.toString()); + deleteByQuery(query); + flush(); + if (o == null) { + return 0; + } + + List<?> array = (List<?>) o; // both GenericArray and DirtyListWrapper + int j = 0; + for (Object item : array) { + m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item))); + count++; + } + return count; + } + + @Override + public boolean delete(K key) { + Query<K, T> q = newQuery(); + q.setKey(key); + return deleteByQuery(q) > 0; + } + + @Override + public long deleteByQuery(Query<K, T> query) { + try { + Scanner scanner = createScanner(query); + // add iterator that drops values on the server side + scanner.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, SortedKeyIterator.class)); + RowIterator iterator = new RowIterator(scanner.iterator()); + + long count = 0; + + while (iterator.hasNext()) { + Iterator<Entry<Key, Value>> row = iterator.next(); + Mutation m = null; + while (row.hasNext()) { + Entry<Key, Value> entry = row.next(); + Key key = entry.getKey(); + if (m == null) + m = new Mutation(key.getRow()); + // TODO optimize to avoid continually creating column vis? + // prob does not matter for empty + m.putDelete(key.getColumnFamily(), key.getColumnQualifier(), + new ColumnVisibility(key.getColumnVisibility()), key.getTimestamp()); + } + getBatchWriter().addMutation(m); + count++; + } + + return count; + } catch (TableNotFoundException e) { + // TODO return 0? + LOG.error(e.getMessage(), e); + return 0; + } catch (MutationsRejectedException e) { + LOG.error(e.getMessage(), e); + return 0; + } catch (IOException e) { + LOG.error(e.getMessage(), e); + return 0; + } + } + + private Range createRange(Query<K, T> query) { + Text startRow = null; + Text endRow = null; + + if (query.getStartKey() != null) + startRow = new Text(toBytes(query.getStartKey())); + + if (query.getEndKey() != null) + endRow = new Text(toBytes(query.getEndKey())); + + return new Range(startRow, true, endRow, true); + + } + + private Scanner createScanner(Query<K, T> query) throws TableNotFoundException { + // TODO make isolated scanner optional? + Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY)); + setFetchColumns(scanner, query.getFields()); + + scanner.setRange(createRange(query)); + + if (query.getStartTime() != -1 || query.getEndTime() != -1) { + IteratorSetting is = new IteratorSetting(30, TimestampFilter.class); + if (query.getStartTime() != -1) + TimestampFilter.setStart(is, query.getStartTime(), true); + if (query.getEndTime() != -1) + TimestampFilter.setEnd(is, query.getEndTime(), true); + + scanner.addScanIterator(is); + } + + return scanner; + } + + /** + * Execute the query and return the result. + */ + @Override + public Result<K, T> execute(Query<K, T> query) { + try { + Scanner scanner = createScanner(query); + return new AccumuloResult<>(this, query, scanner); + } catch (TableNotFoundException e) { + // TODO return empty result? + LOG.error(e.getMessage(), e); + return null; + } + } + + @Override + public Query<K, T> newQuery() { + return new AccumuloQuery<>(this); + } + + Text pad(Text key, int bytes) { + if (key.getLength() < bytes) + key = new Text(key); + + while (key.getLength() < bytes) { + key.append(new byte[] { 0 }, 0, 1); + } + + return key; + } + + @Override + public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException { + try { + TabletLocator tl; + if (conn instanceof MockConnector) + tl = new MockTabletLocator(); + else + tl = TabletLocator.getLocator( + new ClientContext(conn.getInstance(), credentials, + AccumuloConfiguration.getTableConfiguration(conn, + Tables.getTableId(conn.getInstance(), mapping.tableName))), + new Text(Tables.getTableId(conn.getInstance(), mapping.tableName))); + + Map<String, Map<KeyExtent, List<Range>>> binnedRanges = new HashMap<>(); + + tl.invalidateCache(); + while (tl.binRanges( + new ClientContext(conn.getInstance(), credentials, + AccumuloConfiguration.getTableConfiguration(conn, + Tables.getTableId(conn.getInstance(), mapping.tableName))), + Collections.singletonList(createRange(query)), binnedRanges).size() > 0) { + // TODO log? + if (!Tables.exists(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName))) + throw new TableDeletedException(Tables.getTableId(conn.getInstance(), mapping.tableName)); + else if (Tables.getTableState(conn.getInstance(), + Tables.getTableId(conn.getInstance(), mapping.tableName)) == TableState.OFFLINE) + throw new TableOfflineException(conn.getInstance(), + Tables.getTableId(conn.getInstance(), mapping.tableName)); + UtilWaitThread.sleep(100); + tl.invalidateCache(); + } + + List<PartitionQuery<K, T>> ret = new ArrayList<>(); + + Text startRow = null; + Text endRow = null; + if (query.getStartKey() != null) + startRow = new Text(toBytes(query.getStartKey())); + if (query.getEndKey() != null) + endRow = new Text(toBytes(query.getEndKey())); + + // hadoop expects hostnames, accumulo keeps track of IPs... so need + // to convert + HashMap<String, String> hostNameCache = new HashMap<>(); + + for (Entry<String, Map<KeyExtent, List<Range>>> entry : binnedRanges.entrySet()) { + String ip = entry.getKey().split(":", 2)[0]; + String location = hostNameCache.get(ip); + if (location == null) { + InetAddress inetAddress = InetAddress.getByName(ip); + location = inetAddress.getHostName(); + hostNameCache.put(ip, location); + } + + Map<KeyExtent, List<Range>> tablets = entry.getValue(); + for (KeyExtent ke : tablets.keySet()) { + + K startKey = null; + if (startRow == null || !ke.contains(startRow)) { + if (ke.getPrevEndRow() != null) { + startKey = followingKey(encoder, getKeyClass(), getBytes(ke.getPrevEndRow())); + } + } else { + startKey = fromBytes(getKeyClass(), getBytes(startRow)); + } + + K endKey = null; + if (endRow == null || !ke.contains(endRow)) { + if (ke.getEndRow() != null) + endKey = lastPossibleKey(encoder, getKeyClass(), getBytes(ke.getEndRow())); + } else { + endKey = fromBytes(getKeyClass(), getBytes(endRow)); + } + + PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<>(query, startKey, endKey, location); + pqi.setConf(getConf()); + ret.add(pqi); + } + } + + return ret; + } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) { + throw new IOException(e); + } + + } + + static <K> K lastPossibleKey(Encoder encoder, Class<K> clazz, byte[] er) { + + if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { + throw new UnsupportedOperationException(); + } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) { + throw new UnsupportedOperationException(); + } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) { + return fromBytes(encoder, clazz, encoder.lastPossibleKey(2, er)); + } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) { + return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er)); + } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) { + return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er)); + } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) { + return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er)); + } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) { + return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er)); + } else if (clazz.equals(String.class)) { + throw new UnsupportedOperationException(); + } else if (clazz.equals(Utf8.class)) { + return fromBytes(encoder, clazz, er); + } + + throw new IllegalArgumentException(UNKOWN + clazz.getName()); + } + + @SuppressWarnings("unchecked") + static <K> K followingKey(Encoder encoder, Class<K> clazz, byte[] per) { + + if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { + return (K) Byte.valueOf(encoder.followingKey(1, per)[0]); + } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) { + throw new UnsupportedOperationException(); + } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) { + return fromBytes(encoder, clazz, encoder.followingKey(2, per)); + } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) { + return fromBytes(encoder, clazz, encoder.followingKey(4, per)); + } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) { + return fromBytes(encoder, clazz, encoder.followingKey(8, per)); + } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) { + return fromBytes(encoder, clazz, encoder.followingKey(4, per)); + } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) { + return fromBytes(encoder, clazz, encoder.followingKey(8, per)); + } else if (clazz.equals(String.class)) { + throw new UnsupportedOperationException(); + } else if (clazz.equals(Utf8.class)) { + return fromBytes(encoder, clazz, Arrays.copyOf(per, per.length + 1)); + } + + throw new IllegalArgumentException(UNKOWN + clazz.getName()); + } + + @Override + public void flush() { + try { + if (batchWriter != null) { + batchWriter.flush(); + } + } catch (MutationsRejectedException e) { + LOG.error(e.getMessage(), e); + } + } + + @Override + public void close() { + try { + if (batchWriter != null) { + batchWriter.close(); + batchWriter = null; + } + } catch (MutationsRejectedException e) { + LOG.error(e.getMessage(), e); + } + } }