http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java index d16fe3a..bb9889e 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MapSerDeser.java @@ -24,145 +24,148 @@ import java.util.TreeMap; /** * Serialization/deserialization for map type - * */ @SuppressWarnings("rawtypes") public class MapSerDeser implements EntitySerDeser<Map> { - @SuppressWarnings({ "unchecked" }) - @Override - public Map deserialize(byte[] bytes) { - if (bytes == null || bytes.length == 0) { - return null; - } - final Map map = new TreeMap(); - int offset = 0; - // get size of int array - final int size = ByteUtil.bytesToInt(bytes, offset); - offset += 4; - - for (int i = 0; i < size; ++i) { - final int keyID = ByteUtil.bytesToInt(bytes, offset); - offset += 4; - final Class<?> keyClass = EntityDefinitionManager.getClassByID(keyID); - if (keyClass == null) { - throw new IllegalArgumentException("Unsupported key type ID: " + keyID); - } - final EntitySerDeser keySerDer = EntityDefinitionManager.getSerDeser(keyClass); - final int keyLength = ByteUtil.bytesToInt(bytes, offset); - offset += 4; - final byte[] keyContent = new byte[keyLength]; - System.arraycopy(bytes, offset, keyContent, 0, keyLength); - offset += keyLength; - final Object key = keySerDer.deserialize(keyContent); - - final int valueID = ByteUtil.bytesToInt(bytes, offset); - offset += 4; - final Class<?> valueClass = EntityDefinitionManager.getClassByID(valueID); - if (valueClass == null) { - throw new IllegalArgumentException("Unsupported value type ID: " + valueID); - } - final EntitySerDeser valueSerDer = EntityDefinitionManager.getSerDeser(valueClass); - final int valueLength = ByteUtil.bytesToInt(bytes, offset); - offset += 4; - final byte[] valueContent = new byte[valueLength]; - System.arraycopy(bytes, offset, valueContent, 0, valueLength); - offset += valueLength; - final Object value = valueSerDer.deserialize(valueContent); - - map.put(key, value); - } - return map; - } + @SuppressWarnings({ + "unchecked" + }) + @Override + public Map deserialize(byte[] bytes) { + if (bytes == null || bytes.length == 0) { + return null; + } + final Map map = new TreeMap(); + int offset = 0; + // get size of int array + final int size = ByteUtil.bytesToInt(bytes, offset); + offset += 4; - /** - * size + key1 type ID + key1 length + key1 binary content + value1 type id + value length + value1 binary content + ... - * 4B 4B 4B key1 bytes 4B 4B value1 bytes - */ - @SuppressWarnings({ "unchecked" }) - @Override - public byte[] serialize(Map map) { - if(map == null) - return null; - final int size = map.size(); - final int[] keyIDs = new int[size]; - final int[] valueIDs = new int[size]; - final byte[][] keyBytes = new byte[size][]; - final byte[][] valueBytes = new byte[size][]; - - int totalSize = 4 + size * 16; - int i = 0; - Iterator iter = map.entrySet().iterator(); - while (iter.hasNext()) { - final Map.Entry entry = (Map.Entry)iter.next(); - final Object key = entry.getKey(); - final Object value = entry.getValue(); - Class<?> keyClass = key.getClass(); - Class<?> valueClass = NullObject.class; - if (value != null) { - valueClass = value.getClass(); - } - int keyTypeID = EntityDefinitionManager.getIDBySerDerClass(keyClass); - int valueTypeID = 0; // default null object - if (valueClass != null) { - valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass); - } - if (keyTypeID == -1) { - if (key instanceof Map) { - keyClass = Map.class; - keyTypeID = EntityDefinitionManager.getIDBySerDerClass(keyClass); - } else { - throw new IllegalArgumentException("Unsupported class: " + keyClass.getName()); - } - } - if (valueTypeID == -1) { - if (value instanceof Map) { - valueClass = Map.class; - valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass); - } else { - throw new IllegalArgumentException("Unsupported class: " + valueClass.getName()); - } - } - keyIDs[i] = keyTypeID; - valueIDs[i] = valueTypeID; - final EntitySerDeser keySerDer = EntityDefinitionManager.getSerDeser(keyClass); - final EntitySerDeser valueSerDer = EntityDefinitionManager.getSerDeser(valueClass); - if (keySerDer == null) { - throw new IllegalArgumentException("Unsupported class: " + keyClass.getName()); - } - if (valueSerDer == null) { - throw new IllegalArgumentException("Unsupported class: " + valueClass.getName()); - } - keyBytes[i] = keySerDer.serialize(key); - valueBytes[i] = valueSerDer.serialize(value); - totalSize += keyBytes[i].length + valueBytes[i].length; - ++i; - } - final byte[] result = new byte[totalSize]; - int offset = 0; - ByteUtil.intToBytes(size, result, offset); - offset += 4; - for (i = 0; i < size; ++i) { - ByteUtil.intToBytes(keyIDs[i], result, offset); - offset += 4; - ByteUtil.intToBytes(keyBytes[i].length, result, offset); - offset += 4; - System.arraycopy(keyBytes[i], 0, result, offset, keyBytes[i].length); - offset += keyBytes[i].length; - - ByteUtil.intToBytes(valueIDs[i], result, offset); - offset += 4; - ByteUtil.intToBytes(valueBytes[i].length, result, offset); - offset += 4; - System.arraycopy(valueBytes[i], 0, result, offset, valueBytes[i].length); - offset += valueBytes[i].length; - } - return result; - } + for (int i = 0; i < size; ++i) { + final int keyID = ByteUtil.bytesToInt(bytes, offset); + offset += 4; + final Class<?> keyClass = EntityDefinitionManager.getClassByID(keyID); + if (keyClass == null) { + throw new IllegalArgumentException("Unsupported key type ID: " + keyID); + } + final EntitySerDeser keySerDer = EntityDefinitionManager.getSerDeser(keyClass); + final int keyLength = ByteUtil.bytesToInt(bytes, offset); + offset += 4; + final byte[] keyContent = new byte[keyLength]; + System.arraycopy(bytes, offset, keyContent, 0, keyLength); + offset += keyLength; + final Object key = keySerDer.deserialize(keyContent); - @Override - public Class<Map> type() { - return Map.class; - } -} + final int valueID = ByteUtil.bytesToInt(bytes, offset); + offset += 4; + final Class<?> valueClass = EntityDefinitionManager.getClassByID(valueID); + if (valueClass == null) { + throw new IllegalArgumentException("Unsupported value type ID: " + valueID); + } + final EntitySerDeser valueSerDer = EntityDefinitionManager.getSerDeser(valueClass); + final int valueLength = ByteUtil.bytesToInt(bytes, offset); + offset += 4; + final byte[] valueContent = new byte[valueLength]; + System.arraycopy(bytes, offset, valueContent, 0, valueLength); + offset += valueLength; + final Object value = valueSerDer.deserialize(valueContent); + + map.put(key, value); + } + return map; + } + + /** + * size + key1 type ID + key1 length + key1 binary content + value1 type id + value length + value1 binary + * content + ... 4B 4B 4B key1 bytes 4B 4B value1 bytes + */ + @SuppressWarnings({ + "unchecked" + }) + @Override + public byte[] serialize(Map map) { + if (map == null) { + return null; + } + final int size = map.size(); + final int[] keyIDs = new int[size]; + final int[] valueIDs = new int[size]; + final byte[][] keyBytes = new byte[size][]; + final byte[][] valueBytes = new byte[size][]; + int totalSize = 4 + size * 16; + int i = 0; + Iterator iter = map.entrySet().iterator(); + while (iter.hasNext()) { + final Map.Entry entry = (Map.Entry)iter.next(); + final Object key = entry.getKey(); + final Object value = entry.getValue(); + Class<?> keyClass = key.getClass(); + Class<?> valueClass = NullObject.class; + if (value != null) { + valueClass = value.getClass(); + } + int keyTypeID = EntityDefinitionManager.getIDBySerDerClass(keyClass); + int valueTypeID = 0; // default null object + if (valueClass != null) { + valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass); + } + if (keyTypeID == -1) { + if (key instanceof Map) { + keyClass = Map.class; + keyTypeID = EntityDefinitionManager.getIDBySerDerClass(keyClass); + } else { + throw new IllegalArgumentException("Unsupported class: " + keyClass.getName()); + } + } + if (valueTypeID == -1) { + if (value instanceof Map) { + valueClass = Map.class; + valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass); + } else { + throw new IllegalArgumentException("Unsupported class: " + valueClass.getName()); + } + } + keyIDs[i] = keyTypeID; + valueIDs[i] = valueTypeID; + final EntitySerDeser keySerDer = EntityDefinitionManager.getSerDeser(keyClass); + final EntitySerDeser valueSerDer = EntityDefinitionManager.getSerDeser(valueClass); + if (keySerDer == null) { + throw new IllegalArgumentException("Unsupported class: " + keyClass.getName()); + } + if (valueSerDer == null) { + throw new IllegalArgumentException("Unsupported class: " + valueClass.getName()); + } + keyBytes[i] = keySerDer.serialize(key); + valueBytes[i] = valueSerDer.serialize(value); + totalSize += keyBytes[i].length + valueBytes[i].length; + ++i; + } + final byte[] result = new byte[totalSize]; + int offset = 0; + ByteUtil.intToBytes(size, result, offset); + offset += 4; + for (i = 0; i < size; ++i) { + ByteUtil.intToBytes(keyIDs[i], result, offset); + offset += 4; + ByteUtil.intToBytes(keyBytes[i].length, result, offset); + offset += 4; + System.arraycopy(keyBytes[i], 0, result, offset, keyBytes[i].length); + offset += keyBytes[i].length; + + ByteUtil.intToBytes(valueIDs[i], result, offset); + offset += 4; + ByteUtil.intToBytes(valueBytes[i].length, result, offset); + offset += 4; + System.arraycopy(valueBytes[i], 0, result, offset, valueBytes[i].length); + offset += valueBytes[i].length; + } + return result; + } + + @Override + public Class<Map> type() { + return Map.class; + } +}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java index 0e3e776..5f5bb6e 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Metric.java @@ -21,9 +21,11 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -@Target({ElementType.TYPE}) +@Target({ + ElementType.TYPE +}) @Retention(RetentionPolicy.RUNTIME) public @interface Metric { - // interval with million seconds - long interval() default 60000; + // interval with million seconds + long interval() default 60000; } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java index 06bbed3..4b23ea9 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/MetricDefinition.java @@ -25,44 +25,54 @@ import java.io.DataOutput; import java.io.IOException; public class MetricDefinition implements Writable { - private final static Logger LOG = LoggerFactory.getLogger(MetricDefinition.class); - private long interval; - private Class<?> singleTimestampEntityClass; - public long getInterval() { - return interval; - } - public void setInterval(long interval) { - this.interval = interval; - } - public Class<?> getSingleTimestampEntityClass() { - return singleTimestampEntityClass; - } - public void setSingleTimestampEntityClass(Class<?> singleTimestampEntityClass) { - this.singleTimestampEntityClass = singleTimestampEntityClass; - } + private static final Logger LOG = LoggerFactory.getLogger(MetricDefinition.class); + private long interval; + private Class<?> singleTimestampEntityClass; - private final static String EMPTY=""; - @Override - public void write(DataOutput out) throws IOException { - if(LOG.isDebugEnabled()) LOG.debug("Writing metric definition: interval = "+interval+" singleTimestampEntityClass = "+ this.singleTimestampEntityClass); - out.writeLong(interval); - if(this.singleTimestampEntityClass == null){ - out.writeUTF(EMPTY); - }else { - out.writeUTF(this.singleTimestampEntityClass.getName()); - } - } + public long getInterval() { + return interval; + } - @Override - public void readFields(DataInput in) throws IOException { - interval = in.readLong(); - String singleTimestampEntityClassName = in.readUTF(); - if(!EMPTY.equals(singleTimestampEntityClassName)) { - try { - this.singleTimestampEntityClass = Class.forName(singleTimestampEntityClassName); - } catch (ClassNotFoundException e) { - if(LOG.isDebugEnabled()) LOG.warn("Class " + singleTimestampEntityClassName + " not found "); - } - } - } + public void setInterval(long interval) { + this.interval = interval; + } + + public Class<?> getSingleTimestampEntityClass() { + return singleTimestampEntityClass; + } + + public void setSingleTimestampEntityClass(Class<?> singleTimestampEntityClass) { + this.singleTimestampEntityClass = singleTimestampEntityClass; + } + + private static final String EMPTY = ""; + + @Override + public void write(DataOutput out) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Writing metric definition: interval = " + interval + " singleTimestampEntityClass = " + + this.singleTimestampEntityClass); + } + out.writeLong(interval); + if (this.singleTimestampEntityClass == null) { + out.writeUTF(EMPTY); + } else { + out.writeUTF(this.singleTimestampEntityClass.getName()); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + interval = in.readLong(); + String singleTimestampEntityClassName = in.readUTF(); + if (!EMPTY.equals(singleTimestampEntityClassName)) { + try { + this.singleTimestampEntityClass = Class.forName(singleTimestampEntityClassName); + } catch (ClassNotFoundException e) { + if (LOG.isDebugEnabled()) { + LOG.warn("Class " + singleTimestampEntityClassName + " not found "); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java index 9fb05a3..01536f1 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndex.java @@ -21,7 +21,9 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -@Target({ElementType.TYPE}) +@Target({ + ElementType.TYPE +}) @Retention(RetentionPolicy.RUNTIME) public @interface NonUniqueIndex { http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java index ff11397..f838590 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NonUniqueIndexes.java @@ -21,11 +21,12 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; - -@Target({ElementType.TYPE}) +@Target({ + ElementType.TYPE +}) @Retention(RetentionPolicy.RUNTIME) public @interface NonUniqueIndexes { - - public NonUniqueIndex[] value(); + + public NonUniqueIndex[] value(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java index 1778788..fd76999 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/NullSerDeser.java @@ -16,22 +16,22 @@ */ package org.apache.eagle.log.entity.meta; -public class NullSerDeser implements EntitySerDeser<NullObject>{ +public class NullSerDeser implements EntitySerDeser<NullObject> { - private static final byte[] EMPTY_NULL_ARRAY = new byte[0]; - - @Override - public NullObject deserialize(byte[] bytes) { - return null; - } + private static final byte[] EMPTY_NULL_ARRAY = new byte[0]; - @Override - public byte[] serialize(NullObject t) { - return EMPTY_NULL_ARRAY; - } + @Override + public NullObject deserialize(byte[] bytes) { + return null; + } - @Override - public Class<NullObject> type() { - return NullObject.class; - } + @Override + public byte[] serialize(NullObject t) { + return EMPTY_NULL_ARRAY; + } + + @Override + public Class<NullObject> type() { + return NullObject.class; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java index cb60016..479cb33 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Partition.java @@ -22,19 +22,18 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** - * Partition annotation will impact the rowkey generation for Eagle entities. Once an entity class - * has defined the partition fields for an Eagle entity, the hash codes of the defined partition - * fields will be placed just after prefix field, and before timestamp field. - * - * + * Partition annotation will impact the rowkey generation for Eagle entities. Once an entity class has defined + * the partition fields for an Eagle entity, the hash codes of the defined partition fields will be placed + * just after prefix field, and before timestamp field. */ -@Target({ElementType.TYPE}) +@Target({ + ElementType.TYPE +}) @Retention(RetentionPolicy.RUNTIME) -public @interface Partition -{ +public @interface Partition { /** * Order in which annotated tags are to be regarded as data partitions. */ - public String[] value() default { }; + public String[] value() default {}; } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java index 36f404c..587243d 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Prefix.java @@ -21,8 +21,10 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -@Target({ElementType.TYPE}) +@Target({ + ElementType.TYPE +}) @Retention(RetentionPolicy.RUNTIME) public @interface Prefix { - String value() default ""; -} \ No newline at end of file + String value() default ""; +} http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Qualifier.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Qualifier.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Qualifier.java index 64d73dd..7f849ff 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Qualifier.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Qualifier.java @@ -27,74 +27,83 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -public class Qualifier implements Writable{ - private final static Logger LOG = LoggerFactory.getLogger(Qualifier.class); +public class Qualifier implements Writable { + private static final Logger LOG = LoggerFactory.getLogger(Qualifier.class); - private String displayName; - private String qualifierName; - private EntitySerDeser<Object> serDeser; - @JsonIgnore - public EntitySerDeser<Object> getSerDeser() { - return serDeser; - } - public void setSerDeser(EntitySerDeser<Object> serDeser) { - this.serDeser = serDeser; - } - public String getDisplayName() { - return displayName; - } - public void setDisplayName(String displayName) { - this.displayName = displayName; - } - public String getQualifierName() { - return qualifierName; - } - public void setQualifierName(String qualifierName) { - this.qualifierName = qualifierName; - } - - public String toString(){ - StringBuffer sb = new StringBuffer(); - sb.append("displayName:"); - sb.append(displayName); - sb.append(","); - sb.append("qualifierName:"); - sb.append(qualifierName); - sb.append(","); - sb.append("serDeser class:"); - sb.append(serDeser.getClass().getName()); - return sb.toString(); - } + private String displayName; + private String qualifierName; + private EntitySerDeser<Object> serDeser; - @Override - public void write(DataOutput out) throws IOException { - out.writeUTF(displayName); - out.writeUTF(qualifierName); - out.writeUTF(serDeser.getClass().getName()); - } + @JsonIgnore + public EntitySerDeser<Object> getSerDeser() { + return serDeser; + } - private final static Map<String, EntitySerDeser> _entitySerDeserCache = new HashMap<String,EntitySerDeser>(); + public void setSerDeser(EntitySerDeser<Object> serDeser) { + this.serDeser = serDeser; + } - @Override - public void readFields(DataInput in) throws IOException { - displayName = in.readUTF(); - qualifierName = in.readUTF(); - String serDeserClassName = in.readUTF(); + public String getDisplayName() { + return displayName; + } - EntitySerDeser _cached = _entitySerDeserCache.get(serDeserClassName); - if(_cached != null){ - this.serDeser = _cached; - }else { - try { - if (LOG.isDebugEnabled()) LOG.debug("Creating new instance for " + serDeserClassName); - Class serDeserClass = Class.forName(serDeserClassName); - this.serDeser = (EntitySerDeser) serDeserClass.newInstance(); - _entitySerDeserCache.put(serDeserClassName, this.serDeser); - } catch (Exception e) { - if (LOG.isDebugEnabled()) { - LOG.warn("Class not found for " + serDeserClassName + ": " + e.getMessage(), e); - } - } - } - } + public void setDisplayName(String displayName) { + this.displayName = displayName; + } + + public String getQualifierName() { + return qualifierName; + } + + public void setQualifierName(String qualifierName) { + this.qualifierName = qualifierName; + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("displayName:"); + sb.append(displayName); + sb.append(","); + sb.append("qualifierName:"); + sb.append(qualifierName); + sb.append(","); + sb.append("serDeser class:"); + sb.append(serDeser.getClass().getName()); + return sb.toString(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(displayName); + out.writeUTF(qualifierName); + out.writeUTF(serDeser.getClass().getName()); + } + + private static final Map<String, EntitySerDeser> _entitySerDeserCache = new HashMap<String, EntitySerDeser>(); + + @Override + public void readFields(DataInput in) throws IOException { + displayName = in.readUTF(); + qualifierName = in.readUTF(); + String serDeserClassName = in.readUTF(); + + EntitySerDeser _cached = _entitySerDeserCache.get(serDeserClassName); + if (_cached != null) { + this.serDeser = _cached; + } else { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating new instance for " + serDeserClassName); + } + Class serDeserClass = Class.forName(serDeserClassName); + this.serDeser = (EntitySerDeser)serDeserClass.newInstance(); + _entitySerDeserCache.put(serDeserClassName, this.serDeser); + } catch (Exception e) { + if (LOG.isDebugEnabled()) { + LOG.warn("Class not found for " + serDeserClassName + ": " + e.getMessage(), e); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Service.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Service.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Service.java index 22d70ed..f6e9700 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Service.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Service.java @@ -21,8 +21,10 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -@Target({ElementType.TYPE}) +@Target({ + ElementType.TYPE +}) @Retention(RetentionPolicy.RUNTIME) public @interface Service { - String value() default ""; + String value() default ""; } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ServicePath.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ServicePath.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ServicePath.java index 8c712d0..6dc15c5 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ServicePath.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ServicePath.java @@ -23,9 +23,10 @@ import java.lang.annotation.Target; /** * This class is for service client for generic entity creation API (entities and metrics) - * */ -@Target({ElementType.TYPE}) +@Target({ + ElementType.TYPE +}) @Retention(RetentionPolicy.RUNTIME) public @interface ServicePath { http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringArraySerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringArraySerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringArraySerDeser.java index 635065b..2e5fa8d 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringArraySerDeser.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringArraySerDeser.java @@ -22,73 +22,73 @@ import org.apache.eagle.common.ByteUtil; /** * String array entity serializer and deserializer - * */ public class StringArraySerDeser implements EntitySerDeser<String[]> { - public static final int MAX_STRING_LENGTH = 65535; - public static final String UTF_8 = "UTF-8"; - - @Override - public String[] deserialize(byte[] bytes) { - if(bytes == null || bytes.length < 4) - return null; - int offset = 0; - // get size of int array - final int size = ByteUtil.bytesToInt(bytes, offset); - offset += 4; - final String[] strings = new String[size]; - try { - for(int i = 0; i < size; i++) { - final int len = ByteUtil.bytesToInt(bytes, offset); - offset += 4; - strings[i] = new String(bytes, offset, len, UTF_8); - offset += len; - } - } catch (UnsupportedEncodingException e) { - throw new IllegalArgumentException("Invalid byte array"); - } - return strings; - } - - /** - * size + str1 length + str1 + str2 length + str2 + ... - * 4B 4B n1B 4B n2B - * - * @param obj - * @return - */ - @Override - public byte[] serialize(String[] array) { - if(array == null) - return null; - final int size = array.length; - final byte[][] tmp = new byte[size][]; - int total = 4 + 4 * size; - for (int i = 0; i < size; ++i) { - try { - tmp[i] = array[i].getBytes(UTF_8); - } catch (UnsupportedEncodingException e) { - throw new IllegalArgumentException("String doesn't support UTF-8 encoding: " + array[i]); - } - total += tmp[i].length; - } - final byte[] result = new byte[total]; - int offset = 0; - ByteUtil.intToBytes(size, result, offset); - offset += 4; - for (int i = 0; i < size; ++i) { - ByteUtil.intToBytes(tmp[i].length, result, offset); - offset += 4; - System.arraycopy(tmp[i], 0, result, offset, tmp[i].length); - offset += tmp[i].length; - } - return result; - } + public static final int MAX_STRING_LENGTH = 65535; + public static final String UTF_8 = "UTF-8"; + + @Override + public String[] deserialize(byte[] bytes) { + if (bytes == null || bytes.length < 4) { + return null; + } + int offset = 0; + // get size of int array + final int size = ByteUtil.bytesToInt(bytes, offset); + offset += 4; + final String[] strings = new String[size]; + try { + for (int i = 0; i < size; i++) { + final int len = ByteUtil.bytesToInt(bytes, offset); + offset += 4; + strings[i] = new String(bytes, offset, len, UTF_8); + offset += len; + } + } catch (UnsupportedEncodingException e) { + throw new IllegalArgumentException("Invalid byte array"); + } + return strings; + } + + /** + * size + str1 length + str1 + str2 length + str2 + ... 4B 4B n1B 4B n2B + * + * @param obj + * @return + */ + @Override + public byte[] serialize(String[] array) { + if (array == null) { + return null; + } + final int size = array.length; + final byte[][] tmp = new byte[size][]; + int total = 4 + 4 * size; + for (int i = 0; i < size; ++i) { + try { + tmp[i] = array[i].getBytes(UTF_8); + } catch (UnsupportedEncodingException e) { + throw new IllegalArgumentException("String doesn't support UTF-8 encoding: " + array[i]); + } + total += tmp[i].length; + } + final byte[] result = new byte[total]; + int offset = 0; + ByteUtil.intToBytes(size, result, offset); + offset += 4; + for (int i = 0; i < size; ++i) { + ByteUtil.intToBytes(tmp[i].length, result, offset); + offset += 4; + System.arraycopy(tmp[i], 0, result, offset, tmp[i].length); + offset += tmp[i].length; + } + return result; + } - @Override - public Class<String[]> type() { - return String[].class; - } + @Override + public Class<String[]> type() { + return String[].class; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringSerDeser.java index eef6e4f..532e27a 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringSerDeser.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/StringSerDeser.java @@ -18,22 +18,24 @@ package org.apache.eagle.log.entity.meta; public class StringSerDeser implements EntitySerDeser<String> { - public StringSerDeser(){} + public StringSerDeser() { + } - @Override - public String deserialize(byte[] bytes){ - return new String(bytes); - } - - @Override - public byte[] serialize(String obj){ - if(obj == null) - return null; - return obj.getBytes(); - } + @Override + public String deserialize(byte[] bytes) { + return new String(bytes); + } - @Override - public Class<String> type() { - return String.class; - } + @Override + public byte[] serialize(String obj) { + if (obj == null) { + return null; + } + return obj.getBytes(); + } + + @Override + public Class<String> type() { + return String.class; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Table.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Table.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Table.java index ac722cd..d36487a 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Table.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Table.java @@ -21,8 +21,10 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -@Target({ElementType.TYPE}) +@Target({ + ElementType.TYPE +}) @Retention(RetentionPolicy.RUNTIME) public @interface Table { - String value() default ""; + String value() default ""; } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Tags.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Tags.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Tags.java index ac9b328..0f5c38d 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Tags.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Tags.java @@ -27,5 +27,7 @@ import java.lang.annotation.Target; @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface Tags { - String[] value() default {""}; + String[] value() default { + "" + }; } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/TimeSeries.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/TimeSeries.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/TimeSeries.java index 01023bc..e708f14 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/TimeSeries.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/TimeSeries.java @@ -21,8 +21,10 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -@Target({ElementType.TYPE}) +@Target({ + ElementType.TYPE +}) @Retention(RetentionPolicy.RUNTIME) public @interface TimeSeries { - boolean value() default true; + boolean value() default true; } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericByRowkeyReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericByRowkeyReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericByRowkeyReader.java index 43a7073..5744ee8 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericByRowkeyReader.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericByRowkeyReader.java @@ -30,49 +30,51 @@ import org.apache.eagle.log.entity.InternalLog; import org.apache.eagle.common.EagleBase64Wrapper; public class GenericByRowkeyReader { - private static final Logger LOG = LoggerFactory.getLogger(GenericByRowkeyReader.class); + private static final Logger LOG = LoggerFactory.getLogger(GenericByRowkeyReader.class); - private TaggedLogObjectMapper mapper; - private String table; - private String columnFamily; - private boolean outputAll; - private List<String> outputColumns; - private GenericReader.EntityFactory entityFactory; - - public GenericByRowkeyReader(TaggedLogObjectMapper mapper, GenericReader.EntityFactory entityFactory, String table, String columnFamily, boolean outputAll, List<String> outputColumns){ - this.mapper = mapper; - this.entityFactory = entityFactory; - this.table = table; - this.columnFamily = columnFamily; - this.outputAll = outputAll; - this.outputColumns = outputColumns; - } - - public List<TaggedLogAPIEntity> read(List<String> rowkeys) throws IOException{ - HBaseLogByRowkeyReader reader = new HBaseLogByRowkeyReader(this.table, this.columnFamily, - outputAll, outputColumns); - List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>(); - try{ - reader.open(); - for(String rowkeyString : rowkeys){ - byte[] rowkey = EagleBase64Wrapper.decode(rowkeyString); - InternalLog log = reader.get(rowkey); - TaggedLogAPIEntity entity = entityFactory.create(); - entities.add(entity); - entity.setTags(log.getTags()); - entity.setTimestamp(log.getTimestamp()); - entity.setEncodedRowkey(log.getEncodedRowkey()); - entity.setPrefix(log.getPrefix()); - Map<String, byte[]> qualifierValues = log.getQualifierValues(); - mapper.populateQualifierValues(entity, qualifierValues); - } - }catch(IOException ex){ - LOG.error("Fail read by rowkey", ex); - throw ex; - }finally{ - reader.close(); - } - - return entities; - } + private TaggedLogObjectMapper mapper; + private String table; + private String columnFamily; + private boolean outputAll; + private List<String> outputColumns; + private GenericReader.EntityFactory entityFactory; + + public GenericByRowkeyReader(TaggedLogObjectMapper mapper, GenericReader.EntityFactory entityFactory, + String table, String columnFamily, boolean outputAll, + List<String> outputColumns) { + this.mapper = mapper; + this.entityFactory = entityFactory; + this.table = table; + this.columnFamily = columnFamily; + this.outputAll = outputAll; + this.outputColumns = outputColumns; + } + + public List<TaggedLogAPIEntity> read(List<String> rowkeys) throws IOException { + HBaseLogByRowkeyReader reader = new HBaseLogByRowkeyReader(this.table, this.columnFamily, outputAll, + outputColumns); + List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>(); + try { + reader.open(); + for (String rowkeyString : rowkeys) { + byte[] rowkey = EagleBase64Wrapper.decode(rowkeyString); + InternalLog log = reader.get(rowkey); + TaggedLogAPIEntity entity = entityFactory.create(); + entities.add(entity); + entity.setTags(log.getTags()); + entity.setTimestamp(log.getTimestamp()); + entity.setEncodedRowkey(log.getEncodedRowkey()); + entity.setPrefix(log.getPrefix()); + Map<String, byte[]> qualifierValues = log.getQualifierValues(); + mapper.populateQualifierValues(entity, qualifierValues); + } + } catch (IOException ex) { + LOG.error("Fail read by rowkey", ex); + throw ex; + } finally { + reader.close(); + } + + return entities; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericDeleter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericDeleter.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericDeleter.java index e97b522..f2acd85 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericDeleter.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericDeleter.java @@ -32,20 +32,19 @@ import org.apache.eagle.log.entity.meta.EntityDefinitionManager; import org.apache.eagle.log.entity.meta.IndexDefinition; public class GenericDeleter { - private static final Logger LOG = LoggerFactory.getLogger(GenericDeleter.class); + private static final Logger LOG = LoggerFactory.getLogger(GenericDeleter.class); - private final HBaseLogDeleter deleter; - private final HBaseLogByRowkeyReader reader; - - - public GenericDeleter(EntityDefinition ed) { - this(ed.getTable(), ed.getColumnFamily()); - } - - public GenericDeleter(String table, String columnFamily) { - this.deleter = new HBaseLogDeleter(table, columnFamily); - this.reader = new HBaseLogByRowkeyReader(table, columnFamily, true, null); - } + private final HBaseLogDeleter deleter; + private final HBaseLogByRowkeyReader reader; + + public GenericDeleter(EntityDefinition ed) { + this(ed.getTable(), ed.getColumnFamily()); + } + + public GenericDeleter(String table, String columnFamily) { + this.deleter = new HBaseLogDeleter(table, columnFamily); + this.reader = new HBaseLogByRowkeyReader(table, columnFamily, true, null); + } public void deleteByRowkeys(List<byte[]> rowkeys) throws Exception { try { @@ -66,70 +65,73 @@ public class GenericDeleter { throw e; } } - - public List<String> delete(List<? extends TaggedLogAPIEntity> entities) throws Exception{ + + public List<String> delete(List<? extends TaggedLogAPIEntity> entities) throws Exception { List<String> encodedRowkey = new LinkedList<String>(); - try{ - deleter.open(); - final Map<Class<? extends TaggedLogAPIEntity>, List<TaggedLogAPIEntity>> entityClassMap = classifyEntities(entities); - for (Map.Entry<Class<? extends TaggedLogAPIEntity>, List<TaggedLogAPIEntity>> entry : entityClassMap.entrySet()) { - final Class<? extends TaggedLogAPIEntity> clazz = entry.getKey(); - final List<? extends TaggedLogAPIEntity> entityList = entry.getValue(); + try { + deleter.open(); + final Map<Class<? extends TaggedLogAPIEntity>, List<TaggedLogAPIEntity>> entityClassMap = classifyEntities(entities); + for (Map.Entry<Class<? extends TaggedLogAPIEntity>, List<TaggedLogAPIEntity>> entry : entityClassMap + .entrySet()) { + final Class<? extends TaggedLogAPIEntity> clazz = entry.getKey(); + final List<? extends TaggedLogAPIEntity> entityList = entry.getValue(); - final EntityDefinition entityDef = EntityDefinitionManager.getEntityDefinitionByEntityClass(clazz); - // TODO: we should fix this hardcoded prefix hack - fixPrefixAndTimestampIssue(entityList, entityDef); + final EntityDefinition entityDef = EntityDefinitionManager + .getEntityDefinitionByEntityClass(clazz); + // TODO: we should fix this hardcoded prefix hack + fixPrefixAndTimestampIssue(entityList, entityDef); - final List<byte[]> rowkeys = RowkeyHelper.getRowkeysByEntities(entityList, entityDef); - // Check index - final IndexDefinition[] indexes = entityDef.getIndexes(); - if (indexes != null && indexes.length > 0) { - reader.open(); - final List<InternalLog> logs = reader.get(rowkeys); - final List<TaggedLogAPIEntity> newEntities = HBaseInternalLogHelper.buildEntities(logs, entityDef); - for (TaggedLogAPIEntity entity : newEntities) { - // Add index rowkeys - for (IndexDefinition index : indexes) { - final byte[] indexRowkey = index.generateIndexRowkey(entity); - rowkeys.add(indexRowkey); - } - } - } - for(byte[] rowkey:rowkeys) { + final List<byte[]> rowkeys = RowkeyHelper.getRowkeysByEntities(entityList, entityDef); + // Check index + final IndexDefinition[] indexes = entityDef.getIndexes(); + if (indexes != null && indexes.length > 0) { + reader.open(); + final List<InternalLog> logs = reader.get(rowkeys); + final List<TaggedLogAPIEntity> newEntities = HBaseInternalLogHelper + .buildEntities(logs, entityDef); + for (TaggedLogAPIEntity entity : newEntities) { + // Add index rowkeys + for (IndexDefinition index : indexes) { + final byte[] indexRowkey = index.generateIndexRowkey(entity); + rowkeys.add(indexRowkey); + } + } + } + for (byte[] rowkey : rowkeys) { encodedRowkey.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey)); } - deleter.deleteRowkeys(rowkeys); - } - }catch(IOException ioe){ - LOG.error("Fail writing tagged log", ioe); - throw ioe; - }finally{ - deleter.close(); - } + deleter.deleteRowkeys(rowkeys); + } + } catch (IOException ioe) { + LOG.error("Fail writing tagged log", ioe); + throw ioe; + } finally { + deleter.close(); + } return encodedRowkey; - } + } - private void fixPrefixAndTimestampIssue(List<? extends TaggedLogAPIEntity> entities, EntityDefinition entityDef) { - for (TaggedLogAPIEntity e : entities) { - e.setPrefix(entityDef.getPrefix()); - if (!entityDef.isTimeSeries()) { - e.setTimestamp(EntityConstants.FIXED_WRITE_TIMESTAMP); // set timestamp to MAX, then actually stored 0 - } - } - } + private void fixPrefixAndTimestampIssue(List<? extends TaggedLogAPIEntity> entities, + EntityDefinition entityDef) { + for (TaggedLogAPIEntity e : entities) { + e.setPrefix(entityDef.getPrefix()); + if (!entityDef.isTimeSeries()) { + e.setTimestamp(EntityConstants.FIXED_WRITE_TIMESTAMP); // set timestamp to MAX, then actually stored 0 + } + } + } - private Map<Class<? extends TaggedLogAPIEntity>, List<TaggedLogAPIEntity>> classifyEntities(List<? extends TaggedLogAPIEntity> entities) { - final Map<Class<? extends TaggedLogAPIEntity>, List<TaggedLogAPIEntity>> result = new - HashMap<Class<? extends TaggedLogAPIEntity>, List<TaggedLogAPIEntity>>(); - for (TaggedLogAPIEntity entity : entities) { - final Class<? extends TaggedLogAPIEntity> clazz = entity.getClass(); - List<TaggedLogAPIEntity> list = result.get(clazz); - if (list == null) { - list = new ArrayList<TaggedLogAPIEntity>(); - result.put(clazz, list); - } - list.add(entity); - } - return result; - } + private Map<Class<? extends TaggedLogAPIEntity>, List<TaggedLogAPIEntity>> classifyEntities(List<? extends TaggedLogAPIEntity> entities) { + final Map<Class<? extends TaggedLogAPIEntity>, List<TaggedLogAPIEntity>> result = new HashMap<Class<? extends TaggedLogAPIEntity>, List<TaggedLogAPIEntity>>(); + for (TaggedLogAPIEntity entity : entities) { + final Class<? extends TaggedLogAPIEntity> clazz = entity.getClass(); + List<TaggedLogAPIEntity> list = result.get(clazz); + if (list == null) { + list = new ArrayList<TaggedLogAPIEntity>(); + result.put(clazz, list); + } + list.add(entity); + } + return result; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericReader.java index 76e314b..09ff861 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericReader.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericReader.java @@ -32,84 +32,87 @@ import org.apache.eagle.log.entity.InternalLog; import org.apache.eagle.common.DateTimeUtil; public class GenericReader { - private static final Logger LOG = LoggerFactory.getLogger(GenericReader.class); + private static final Logger LOG = LoggerFactory.getLogger(GenericReader.class); - public interface EntityFactory{ - public TaggedLogAPIEntity create(); - } - - private Schema schema; - private EntityFactory entityFactory; - private TaggedLogObjectMapper mapper; - - public GenericReader(TaggedLogObjectMapper mapper, Schema schema, EntityFactory factory){ - this.mapper = mapper; - this.schema = schema; - this.entityFactory = factory; - } - - public List<TaggedLogAPIEntity> read(String startTime, - String endTime, List<String> tagNameValues, List<String> outputTags, - List<String> outputFields, String startRowkey, int pageSize) throws Exception{ - Date start = DateTimeUtil.humanDateToDate(startTime); - Date end = DateTimeUtil.humanDateToDate(endTime); - - // decode the query parameters - // TODO should support one tag has multiple tag values - Map<String, List<String>> searchTags = new HashMap<String, List<String>>(); - for(String tagNameValue : tagNameValues){ - String[] tmp = tagNameValue.split("="); - if(tmp == null || tmp.length <=1){ - continue; // silently ignore this parameter - } - List<String> tagValues = searchTags.get(tmp[0]); - if(tagValues == null){ - tagValues = new ArrayList<String>(); - searchTags.put(tmp[0], tagValues); - } - tagValues.add(tmp[1]); - } - - int numTags = outputTags.size(); - int numFields = outputFields.size(); - byte[][] outputQualifiers = new byte[numTags+numFields][]; - int i = 0; - for(String tag : outputTags){ - outputQualifiers[i++] = tag.getBytes(); - } - for(String field : outputFields){ - outputQualifiers[i++] = field.getBytes(); - } - // shortcut to avoid read when pageSize=0 - List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>(); - if(pageSize <= 0){ - return entities; // return empty entities - } + public interface EntityFactory { + public TaggedLogAPIEntity create(); + } - HBaseLogReader reader = new HBaseLogReader(schema, start, end, searchTags, startRowkey, outputQualifiers); - try{ - reader.open(); - InternalLog log; - int count = 0; - while ((log = reader.read()) != null) { - TaggedLogAPIEntity entity = entityFactory.create(); - entity.setTags(log.getTags()); - entity.setTimestamp(log.getTimestamp()); - entity.setEncodedRowkey(log.getEncodedRowkey()); - entity.setPrefix(log.getPrefix()); - entities.add(entity); - - Map<String, byte[]> qualifierValues = log.getQualifierValues(); - mapper.populateQualifierValues(entity, qualifierValues); - if(++count == pageSize) - break; - } - }catch(IOException ioe){ - LOG.error("Fail reading log", ioe); - throw ioe; - }finally{ - reader.close(); - } - return entities; - } + private Schema schema; + private EntityFactory entityFactory; + private TaggedLogObjectMapper mapper; + + public GenericReader(TaggedLogObjectMapper mapper, Schema schema, EntityFactory factory) { + this.mapper = mapper; + this.schema = schema; + this.entityFactory = factory; + } + + public List<TaggedLogAPIEntity> read(String startTime, String endTime, List<String> tagNameValues, + List<String> outputTags, List<String> outputFields, + String startRowkey, int pageSize) + throws Exception { + + // decode the query parameters + // TODO should support one tag has multiple tag values + Map<String, List<String>> searchTags = new HashMap<String, List<String>>(); + for (String tagNameValue : tagNameValues) { + String[] tmp = tagNameValue.split("="); + if (tmp == null || tmp.length <= 1) { + continue; // silently ignore this parameter + } + List<String> tagValues = searchTags.get(tmp[0]); + if (tagValues == null) { + tagValues = new ArrayList<String>(); + searchTags.put(tmp[0], tagValues); + } + tagValues.add(tmp[1]); + } + + int numTags = outputTags.size(); + int numFields = outputFields.size(); + byte[][] outputQualifiers = new byte[numTags + numFields][]; + int i = 0; + for (String tag : outputTags) { + outputQualifiers[i++] = tag.getBytes(); + } + for (String field : outputFields) { + outputQualifiers[i++] = field.getBytes(); + } + // shortcut to avoid read when pageSize=0 + List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>(); + if (pageSize <= 0) { + return entities; // return empty entities + } + + Date start = DateTimeUtil.humanDateToDate(startTime); + Date end = DateTimeUtil.humanDateToDate(endTime); + HBaseLogReader reader = new HBaseLogReader(schema, start, end, searchTags, startRowkey, + outputQualifiers); + try { + reader.open(); + InternalLog log; + int count = 0; + while ((log = reader.read()) != null) { + TaggedLogAPIEntity entity = entityFactory.create(); + entity.setTags(log.getTags()); + entity.setTimestamp(log.getTimestamp()); + entity.setEncodedRowkey(log.getEncodedRowkey()); + entity.setPrefix(log.getPrefix()); + entities.add(entity); + + Map<String, byte[]> qualifierValues = log.getQualifierValues(); + mapper.populateQualifierValues(entity, qualifierValues); + if (++count == pageSize) { + break; + } + } + } catch (IOException ioe) { + LOG.error("Fail reading log", ioe); + throw ioe; + } finally { + reader.close(); + } + return entities; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericWriter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericWriter.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericWriter.java index 3d29237..9f3765a 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericWriter.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/GenericWriter.java @@ -32,62 +32,62 @@ import org.slf4j.LoggerFactory; import org.apache.eagle.common.EagleBase64Wrapper; public class GenericWriter { - private static final Logger LOG = LoggerFactory.getLogger(GenericWriter.class); + private static final Logger LOG = LoggerFactory.getLogger(GenericWriter.class); - private String table; - private String columnFamily; - private TaggedLogObjectMapper mapper; - - public GenericWriter(TaggedLogObjectMapper mapper, String table, String columnFamily){ - this.mapper = mapper; - this.table = table; - this.columnFamily = columnFamily; - } - - public List<String> write(List<? extends TaggedLogAPIEntity> entities) throws IOException{ - HBaseLogWriter writer = new HBaseLogWriter(table, columnFamily); - List<String> rowkeys = new ArrayList<String>(); - - try{ - writer.open(); - for(TaggedLogAPIEntity entity : entities){ - InternalLog log = new InternalLog(); - Map<String, String> inputTags = entity.getTags(); - Map<String, String> tags = new TreeMap<String, String>(); - for(Map.Entry<String, String> entry : inputTags.entrySet()){ - tags.put(entry.getKey(), entry.getValue()); - } - log.setTags(tags); - log.setTimestamp(entity.getTimestamp()); - log.setPrefix(entity.getPrefix()); - log.setQualifierValues(mapper.createQualifierValues(entity)); - byte[] rowkey = writer.write(log); - rowkeys.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey)); - } - }catch(IOException ioe){ - LOG.error("Fail writing tagged log", ioe); - throw ioe; - }finally{ - writer.close(); - } - return rowkeys; - } - - public void updateByRowkey(List<? extends TaggedLogAPIEntity> entities) throws IOException{ - HBaseLogWriter writer = new HBaseLogWriter(table, columnFamily); - try{ - writer.open(); - for(TaggedLogAPIEntity entity : entities){ - byte[] rowkey = EagleBase64Wrapper.decode(entity.getEncodedRowkey()); - InternalLog log = new InternalLog(); - log.setQualifierValues(mapper.createQualifierValues(entity)); - writer.updateByRowkey(rowkey, log); - } - }catch(IOException ioe){ - LOG.error("Fail writing tagged log", ioe); - throw ioe; - }finally{ - writer.close(); - } - } + private String table; + private String columnFamily; + private TaggedLogObjectMapper mapper; + + public GenericWriter(TaggedLogObjectMapper mapper, String table, String columnFamily) { + this.mapper = mapper; + this.table = table; + this.columnFamily = columnFamily; + } + + public List<String> write(List<? extends TaggedLogAPIEntity> entities) throws IOException { + HBaseLogWriter writer = new HBaseLogWriter(table, columnFamily); + List<String> rowkeys = new ArrayList<String>(); + + try { + writer.open(); + for (TaggedLogAPIEntity entity : entities) { + InternalLog log = new InternalLog(); + Map<String, String> inputTags = entity.getTags(); + Map<String, String> tags = new TreeMap<String, String>(); + for (Map.Entry<String, String> entry : inputTags.entrySet()) { + tags.put(entry.getKey(), entry.getValue()); + } + log.setTags(tags); + log.setTimestamp(entity.getTimestamp()); + log.setPrefix(entity.getPrefix()); + log.setQualifierValues(mapper.createQualifierValues(entity)); + byte[] rowkey = writer.write(log); + rowkeys.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey)); + } + } catch (IOException ioe) { + LOG.error("Fail writing tagged log", ioe); + throw ioe; + } finally { + writer.close(); + } + return rowkeys; + } + + public void updateByRowkey(List<? extends TaggedLogAPIEntity> entities) throws IOException { + HBaseLogWriter writer = new HBaseLogWriter(table, columnFamily); + try { + writer.open(); + for (TaggedLogAPIEntity entity : entities) { + byte[] rowkey = EagleBase64Wrapper.decode(entity.getEncodedRowkey()); + InternalLog log = new InternalLog(); + log.setQualifierValues(mapper.createQualifierValues(entity)); + writer.updateByRowkey(rowkey, log); + } + } catch (IOException ioe) { + LOG.error("Fail writing tagged log", ioe); + throw ioe; + } finally { + writer.close(); + } + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogByRowkeyReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogByRowkeyReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogByRowkeyReader.java index 37e55ac..6b49cf7 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogByRowkeyReader.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogByRowkeyReader.java @@ -36,126 +36,129 @@ import org.apache.eagle.common.ByteUtil; import org.apache.eagle.common.EagleBase64Wrapper; /** - * Get details of rowkey and qualifiers given a raw rowkey. This function mostly is used for inspecting one row's content - * This only supports single column family, which is mostly used in log application + * Get details of rowkey and qualifiers given a raw rowkey. This function mostly is used for inspecting one + * row's content This only supports single column family, which is mostly used in log application */ -public class HBaseLogByRowkeyReader implements Closeable{ - private String table; - private String columnFamily; - private byte[][] outputQualifiers; - private boolean includingAllQualifiers; - private HTableInterface tbl; - private boolean isOpen; - - /** - * if includingAllQualifiers is true, then the fourth argument outputQualifiers is ignored - * if includingAllQualifiers is false, then need calculate based on the fourth argument outputQualifiers - */ - public HBaseLogByRowkeyReader(String table, String columnFamily, boolean includingAllQualifiers, List<String> qualifiers){ - this.table = table; - this.columnFamily = columnFamily; - if(qualifiers != null){ - this.outputQualifiers = new byte[qualifiers.size()][]; - int i = 0; - for(String qualifier : qualifiers){ - this.outputQualifiers[i++] = qualifier.getBytes(); - } - } - this.includingAllQualifiers = includingAllQualifiers; - } - - - public void open() throws IOException { - if (isOpen) - return; // silently return - try { - tbl = EagleConfigFactory.load().getHTable(this.table); - } catch (RuntimeException ex) { - throw new IOException(ex); - } - - isOpen = true; - } +public class HBaseLogByRowkeyReader implements Closeable { + private String table; + private String columnFamily; + private byte[][] outputQualifiers; + private boolean includingAllQualifiers; + private HTableInterface tbl; + private boolean isOpen; - /** - * Here all qualifiers' values goes into qualifierValues of InternalLog as given a row, we can't differentiate it's a tag or a field - * @param rowkeys - * @return - * @throws IOException - */ - public List<InternalLog> get(List<byte[]> rowkeys) throws IOException, NoSuchRowException { - final List<Get> gets = createGets(rowkeys); - final Result[] results = tbl.get(gets); - final List<InternalLog> logs = new ArrayList<InternalLog>(); - for (Result result : results) { - final InternalLog log = buildLog(result); - logs.add(log); - } - return logs; - } - - private List<Get> createGets(List<byte[]> rowkeys) { - final List<Get> gets = new ArrayList<Get>(); - for (byte[] rowkey : rowkeys) { - final Get get = createGet(rowkey); - gets.add(get); - } - return gets; - } + /** + * if includingAllQualifiers is true, then the fourth argument outputQualifiers is ignored if + * includingAllQualifiers is false, then need calculate based on the fourth argument outputQualifiers + */ + public HBaseLogByRowkeyReader(String table, String columnFamily, boolean includingAllQualifiers, + List<String> qualifiers) { + this.table = table; + this.columnFamily = columnFamily; + if (qualifiers != null) { + this.outputQualifiers = new byte[qualifiers.size()][]; + int i = 0; + for (String qualifier : qualifiers) { + this.outputQualifiers[i++] = qualifier.getBytes(); + } + } + this.includingAllQualifiers = includingAllQualifiers; + } + public void open() throws IOException { + if (isOpen) { + return; // silently return + } + try { + tbl = EagleConfigFactory.load().getHTable(this.table); + } catch (RuntimeException ex) { + throw new IOException(ex); + } - private Get createGet(byte[] rowkey) { - final Get get = new Get(rowkey); - byte[] cf = this.columnFamily.getBytes(); - if(includingAllQualifiers){ - get.addFamily(cf); - }else{ - for(byte[] outputQualifier : outputQualifiers){ - get.addColumn(cf, outputQualifier); - } - } - return get; - } + isOpen = true; + } + /** + * Here all qualifiers' values goes into qualifierValues of InternalLog as given a row, we can't + * differentiate it's a tag or a field + * + * @param rowkeys + * @return + * @throws IOException + */ + public List<InternalLog> get(List<byte[]> rowkeys) throws IOException, NoSuchRowException { + final List<Get> gets = createGets(rowkeys); + final Result[] results = tbl.get(gets); + final List<InternalLog> logs = new ArrayList<InternalLog>(); + for (Result result : results) { + final InternalLog log = buildLog(result); + logs.add(log); + } + return logs; + } - /** - * Here all qualifiers' values goes into qualifierValues of InternalLog as given a row, we can't differentiate it's a tag or a field - * @param rowkey - * @return - * @throws IOException - */ - public InternalLog get(byte[] rowkey) throws IOException, NoSuchRowException{ - final Get get = createGet(rowkey); - final Result result = tbl.get(get); - final InternalLog log = buildLog(result); - return log; - } - - private InternalLog buildLog(Result result) { - final InternalLog log = new InternalLog(); - final byte[] rowkey = result.getRow(); - log.setEncodedRowkey(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey)); - long timestamp = ByteUtil.bytesToLong(rowkey, 4); - timestamp = Long.MAX_VALUE - timestamp; - log.setTimestamp(timestamp); - Map<String, byte[]> qualifierValues = new HashMap<String, byte[]>(); - log.setQualifierValues(qualifierValues); - NavigableMap<byte[], byte[]> map = result.getFamilyMap(this.columnFamily.getBytes()); - if(map == null){ - throw new NoSuchRowException(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey)); - } - for(Map.Entry<byte[], byte[]> entry : map.entrySet()){ - byte[] qualifier = entry.getKey(); - byte[] value = entry.getValue(); - qualifierValues.put(new String(qualifier), value); - } - return log; - } + /** + * Here all qualifiers' values goes into qualifierValues of InternalLog as given a row, we can't + * differentiate it's a tag or a field + * + * @param rowkey + * @return + * @throws IOException + */ + public InternalLog get(byte[] rowkey) throws IOException, NoSuchRowException { + final Get get = createGet(rowkey); + final Result result = tbl.get(get); + final InternalLog log = buildLog(result); + return log; + } + private List<Get> createGets(List<byte[]> rowkeys) { + final List<Get> gets = new ArrayList<Get>(); + for (byte[] rowkey : rowkeys) { + final Get get = createGet(rowkey); + gets.add(get); + } + return gets; + } - public void close() throws IOException { - if(tbl != null){ - new HTableFactory().releaseHTableInterface(tbl); - } - } + private Get createGet(byte[] rowkey) { + final Get get = new Get(rowkey); + byte[] cf = this.columnFamily.getBytes(); + if (includingAllQualifiers) { + get.addFamily(cf); + } else { + for (byte[] outputQualifier : outputQualifiers) { + get.addColumn(cf, outputQualifier); + } + } + return get; + } + + private InternalLog buildLog(Result result) { + final InternalLog log = new InternalLog(); + final byte[] rowkey = result.getRow(); + log.setEncodedRowkey(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey)); + long timestamp = ByteUtil.bytesToLong(rowkey, 4); + timestamp = Long.MAX_VALUE - timestamp; + log.setTimestamp(timestamp); + Map<String, byte[]> qualifierValues = new HashMap<String, byte[]>(); + log.setQualifierValues(qualifierValues); + NavigableMap<byte[], byte[]> map = result.getFamilyMap(this.columnFamily.getBytes()); + if (map == null) { + throw new NoSuchRowException(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey)); + } + for (Map.Entry<byte[], byte[]> entry : map.entrySet()) { + byte[] qualifier = entry.getKey(); + byte[] value = entry.getValue(); + qualifierValues.put(new String(qualifier), value); + } + return log; + } + + @Override + public void close() throws IOException { + if (tbl != null) { + new HTableFactory().releaseHTableInterface(tbl); + } + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogDeleter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogDeleter.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogDeleter.java index 3460949..ae69321 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogDeleter.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/old/HBaseLogDeleter.java @@ -30,102 +30,105 @@ import org.apache.eagle.log.entity.InternalLog; import org.apache.eagle.log.entity.meta.EntityDefinition; import org.apache.eagle.common.EagleBase64Wrapper; -public class HBaseLogDeleter implements LogDeleter{ - private HTableInterface tbl; - private String table; - private String columnFamily; - - public HBaseLogDeleter(String table, String columnFamily) { - this.table = table; - this.columnFamily = columnFamily; - } - - @Override - public void open() throws IOException { - try{ - tbl = EagleConfigFactory.load().getHTable(this.table); - }catch(RuntimeException ex){ - throw new IOException(ex); - } - } - - @Override - public void close() throws IOException { - if(tbl != null){ - new HTableFactory().releaseHTableInterface(tbl); - } - } - - @Override - public void flush() throws IOException{ - throw new IllegalArgumentException("Not supported flush for hbase delete"); - } - - /** - * support delete by constructing a rowkey or by encoded rowkey passed from client - */ - @Override - public void delete(InternalLog log) throws IOException{ - final byte[] rowkey = RowkeyHelper.getRowkey(log); - final Delete delete = createDelete(rowkey); - tbl.delete(delete); - } - - public void delete(TaggedLogAPIEntity entity, EntityDefinition entityDef) throws Exception { - final byte[] rowkey = RowkeyHelper.getRowkey(entity, entityDef); - final Delete delete = createDelete(rowkey); - tbl.delete(delete); - } - - /** - * Batch delete - * @param logs - * @throws IOException - */ - public void delete(List<InternalLog> logs) throws IOException{ - final List<byte[]> rowkeys = RowkeyHelper.getRowkeysByLogs(logs); - deleteRowkeys(rowkeys); - } - - - /** - * Batch delete - * @throws Exception - */ - public void deleteEntities(List<? extends TaggedLogAPIEntity> entities, EntityDefinition entityDef) throws Exception{ - final List<byte[]> rowkeys = RowkeyHelper.getRowkeysByEntities(entities, entityDef); - deleteRowkeys(rowkeys); - } - - /** - * Batch delete - * @throws IOException - */ - public void deleteRowkeys(List<byte[]> rowkeys) throws IOException { - final List<Delete> deletes = new ArrayList<Delete>(rowkeys.size()); - for (byte[] rowkey : rowkeys) { - final Delete delete = createDelete(rowkey); - deletes.add(delete); - } - tbl.delete(deletes); - } - - @Override - public void deleteRowByRowkey(String encodedRowkey) throws IOException{ - byte[] row = EagleBase64Wrapper.decode(encodedRowkey); - final Delete delete = createDelete(row); - tbl.delete(delete); - } - - public void deleteRowByRowkey(List<String> encodedRowkeys) throws IOException { - final List<byte[]> rowkeys = RowkeyHelper.getRowkeysByEncodedRowkeys(encodedRowkeys); - deleteRowkeys(rowkeys); - } - - private Delete createDelete(byte[] row) throws IOException{ - Delete delete = new Delete(row); - delete.deleteFamily(columnFamily.getBytes()); - return delete; - } +public class HBaseLogDeleter implements LogDeleter { + private HTableInterface tbl; + private String table; + private String columnFamily; + + public HBaseLogDeleter(String table, String columnFamily) { + this.table = table; + this.columnFamily = columnFamily; + } + + @Override + public void open() throws IOException { + try { + tbl = EagleConfigFactory.load().getHTable(this.table); + } catch (RuntimeException ex) { + throw new IOException(ex); + } + } + + @Override + public void close() throws IOException { + if (tbl != null) { + new HTableFactory().releaseHTableInterface(tbl); + } + } + + @Override + public void flush() throws IOException { + throw new IllegalArgumentException("Not supported flush for hbase delete"); + } + + /** + * support delete by constructing a rowkey or by encoded rowkey passed from client. + */ + @Override + public void delete(InternalLog log) throws IOException { + final byte[] rowkey = RowkeyHelper.getRowkey(log); + final Delete delete = createDelete(rowkey); + tbl.delete(delete); + } + + public void delete(TaggedLogAPIEntity entity, EntityDefinition entityDef) throws Exception { + final byte[] rowkey = RowkeyHelper.getRowkey(entity, entityDef); + final Delete delete = createDelete(rowkey); + tbl.delete(delete); + } + + /** + * Batch delete. + * + * @param logs + * @throws IOException + */ + public void delete(List<InternalLog> logs) throws IOException { + final List<byte[]> rowkeys = RowkeyHelper.getRowkeysByLogs(logs); + deleteRowkeys(rowkeys); + } + + /** + * Batch delete. + * + * @throws Exception + */ + public void deleteEntities(List<? extends TaggedLogAPIEntity> entities, EntityDefinition entityDef) + throws Exception { + final List<byte[]> rowkeys = RowkeyHelper.getRowkeysByEntities(entities, entityDef); + deleteRowkeys(rowkeys); + } + + /** + * Batch delete. + * + * @throws IOException + */ + public void deleteRowkeys(List<byte[]> rowkeys) throws IOException { + final List<Delete> deletes = new ArrayList<Delete>(rowkeys.size()); + for (byte[] rowkey : rowkeys) { + final Delete delete = createDelete(rowkey); + deletes.add(delete); + } + tbl.delete(deletes); + } + + @Override + public void deleteRowByRowkey(String encodedRowkey) throws IOException { + byte[] row = EagleBase64Wrapper.decode(encodedRowkey); + final Delete delete = createDelete(row); + tbl.delete(delete); + } + + public void deleteRowByRowkey(List<String> encodedRowkeys) throws IOException { + final List<byte[]> rowkeys = RowkeyHelper.getRowkeysByEncodedRowkeys(encodedRowkeys); + deleteRowkeys(rowkeys); + } + + private Delete createDelete(byte[] row) throws IOException { + Delete delete = new Delete(row); + delete.deleteFamily(columnFamily.getBytes()); + return delete; + } }
