http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java index c3d916e..6dfe27d 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java @@ -29,97 +29,103 @@ import java.util.ArrayList; import java.util.List; public class GenericEntityStreamReader extends StreamReader { - private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReader.class); - - private EntityDefinition entityDef; - private SearchCondition condition; - private String prefix; - private StreamReader readerAfterPlan; - - public GenericEntityStreamReader(String serviceName, SearchCondition condition) throws InstantiationException, IllegalAccessException{ - this(serviceName, condition, null); - } - - public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition) throws InstantiationException, IllegalAccessException{ - this(entityDef, condition, entityDef.getPrefix()); - } - - public GenericEntityStreamReader(String serviceName, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{ - this.prefix = prefix; - checkNotNull(serviceName, "serviceName"); - this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName); - checkNotNull(entityDef, "EntityDefinition"); - this.condition = condition; - this.readerAfterPlan = selectQueryReader(); - } - - public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{ - this.prefix = prefix; - checkNotNull(entityDef, "entityDef"); - this.entityDef = entityDef; - checkNotNull(entityDef, "EntityDefinition"); - this.condition = condition; - this.readerAfterPlan = selectQueryReader(); - } - - private void checkNotNull(Object o, String message){ - if(o == null){ - throw new IllegalArgumentException(message + " should not be null"); - } - } - - public EntityDefinition getEntityDefinition() { - return entityDef; - } - - public SearchCondition getSearchCondition() { - return condition; - } - - @Override - public void readAsStream() throws Exception{ - readerAfterPlan._listeners.addAll(this._listeners); - readerAfterPlan.readAsStream(); - } - - private StreamReader selectQueryReader() throws InstantiationException, IllegalAccessException { - final ORExpression query = condition.getQueryExpression(); - IndexDefinition[] indexDefs = entityDef.getIndexes(); + private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReader.class); + + private EntityDefinition entityDef; + private SearchCondition condition; + private String prefix; + private StreamReader readerAfterPlan; + + public GenericEntityStreamReader(String serviceName, SearchCondition condition) + throws InstantiationException, IllegalAccessException { + this(serviceName, condition, null); + } + + public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition) + throws InstantiationException, IllegalAccessException { + this(entityDef, condition, entityDef.getPrefix()); + } + + public GenericEntityStreamReader(String serviceName, SearchCondition condition, String prefix) + throws InstantiationException, IllegalAccessException { + this.prefix = prefix; + checkNotNull(serviceName, "serviceName"); + this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName); + checkNotNull(entityDef, "EntityDefinition"); + this.condition = condition; + this.readerAfterPlan = selectQueryReader(); + } + + public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition, String prefix) + throws InstantiationException, IllegalAccessException { + this.prefix = prefix; + checkNotNull(entityDef, "entityDef"); + this.entityDef = entityDef; + checkNotNull(entityDef, "EntityDefinition"); + this.condition = condition; + this.readerAfterPlan = selectQueryReader(); + } + + private void checkNotNull(Object o, String message) { + if (o == null) { + throw new IllegalArgumentException(message + " should not be null"); + } + } + + public EntityDefinition getEntityDefinition() { + return entityDef; + } + + public SearchCondition getSearchCondition() { + return condition; + } + + @Override + public void readAsStream() throws Exception { + readerAfterPlan.listeners.addAll(this.listeners); + readerAfterPlan.readAsStream(); + } + + private StreamReader selectQueryReader() throws InstantiationException, IllegalAccessException { + final ORExpression query = condition.getQueryExpression(); + IndexDefinition[] indexDefs = entityDef.getIndexes(); // Index just works with query condition - if (indexDefs != null && condition.getQueryExpression()!=null) { - List<byte[]> rowkeys = new ArrayList<>(); - for (IndexDefinition index : indexDefs) { - // Check unique index first - if (index.isUnique()) { - final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys); - if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) { - LOG.info("Selectd query unique index " + index.getIndexName() + " for query: " + condition.getQueryExpression()); - return new UniqueIndexStreamReader(index, condition, rowkeys); - } - } - } - for (IndexDefinition index : indexDefs) { - // Check non-clustered index - if (!index.isUnique()) { - final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys); - if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) { - LOG.info("Selectd query non clustered index " + index.getIndexName() + " for query: " + condition.getQueryExpression().toString()); - return new NonClusteredIndexStreamReader(index, condition, rowkeys); - } - } - } - } - return new GenericEntityScanStreamReader(entityDef, condition, this.prefix); - } - - @Override - public long getLastTimestamp() { - return readerAfterPlan.getLastTimestamp(); - } - - @Override - public long getFirstTimestamp() { - return readerAfterPlan.getFirstTimestamp(); - } + if (indexDefs != null && condition.getQueryExpression() != null) { + List<byte[]> rowkeys = new ArrayList<>(); + for (IndexDefinition index : indexDefs) { + // Check unique index first + if (index.isUnique()) { + final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys); + if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) { + LOG.info("Selectd query unique index " + index.getIndexName() + " for query: " + + condition.getQueryExpression()); + return new UniqueIndexStreamReader(index, condition, rowkeys); + } + } + } + for (IndexDefinition index : indexDefs) { + // Check non-clustered index + if (!index.isUnique()) { + final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys); + if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) { + LOG.info("Selectd query non clustered index " + index.getIndexName() + " for query: " + + condition.getQueryExpression().toString()); + return new NonClusteredIndexStreamReader(index, condition, rowkeys); + } + } + } + } + return new GenericEntityScanStreamReader(entityDef, condition, this.prefix); + } + + @Override + public long getLastTimestamp() { + return readerAfterPlan.getLastTimestamp(); + } + + @Override + public long getFirstTimestamp() { + return readerAfterPlan.getFirstTimestamp(); + } }
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java index bf72a36..15bdd20 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java @@ -31,121 +31,124 @@ import org.apache.eagle.common.DateTimeUtil; /** * multi-threading stream readers which only applies to time-series entity where we split the query into - * different time range - * - * When this class is used together with list query or aggregate query, be aware that the query's behavior could - * be changed for example pageSize does not work well, output sequence is not determined + * different time range When this class is used together with list query or aggregate query, be aware that the + * query's behavior could be changed for example pageSize does not work well, output sequence is not + * determined */ -public class GenericEntityStreamReaderMT extends StreamReader{ - private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReaderMT.class); - private List<GenericEntityStreamReader> readers = new ArrayList<GenericEntityStreamReader>(); - - public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition, int numThreads) throws Exception{ - checkIsTimeSeries(serviceName); - checkNumThreads(numThreads); - long queryStartTime = condition.getStartTime(); - long queryEndTime = condition.getEndTime(); - long subStartTime = queryStartTime; - long subEndTime = 0; - long interval = (queryEndTime-queryStartTime) / numThreads; - for(int i=0; i<numThreads; i++){ - // split search condition by time range - subStartTime = queryStartTime + i*interval; - if(i == numThreads-1){ - subEndTime = queryEndTime; - }else{ - subEndTime = subStartTime + interval; - } - //String strStartTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subStartTime); - //String strEndTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subEndTime); - SearchCondition sc = new SearchCondition(condition); - sc.setStartTime(subStartTime); - sc.setEndTime(subEndTime); - GenericEntityStreamReader reader = new GenericEntityStreamReader(serviceName, sc); - readers.add(reader); - } - } - - private void checkIsTimeSeries(String serviceName) throws Exception{ - EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName); - if(!ed.isTimeSeries()){ - throw new IllegalArgumentException("Multi-threading stream reader must be applied to time series table"); - } - } - - private void checkNumThreads(int numThreads){ - if(numThreads <= 0){ - throw new IllegalArgumentException("Multi-threading stream reader must have numThreads >= 1"); - } - } - - /** - * default to 2 threads - * @param serviceName - * @param condition - */ - public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition) throws Exception{ - this(serviceName, condition, 2); - } - - @Override - public void readAsStream() throws Exception{ - // populate listeners to all readers - for(EntityCreationListener l : _listeners){ - for(GenericEntityStreamReader r : readers){ - r.register(l); - } - } - - List<Future<Void>> futures = new ArrayList<Future<Void>>(); - for(GenericEntityStreamReader r : readers){ - SingleReader reader = new SingleReader(r); - Future<Void> readFuture = EagleConfigFactory.load().getExecutor().submit(reader); - futures.add(readFuture); - } - - // join threads and check exceptions - for(Future<Void> future : futures){ - try{ - future.get(); - }catch(Exception ex){ - LOG.error("Error in read", ex); - throw ex; - } - } - } - - private static class SingleReader implements Callable<Void>{ - private GenericEntityStreamReader reader; - public SingleReader(GenericEntityStreamReader reader){ - this.reader = reader; - } - @Override - public Void call() throws Exception{ - reader.readAsStream(); - return null; - } - } - - @Override - public long getLastTimestamp() { - long lastTimestamp = 0; - for (GenericEntityStreamReader reader : readers) { - if (lastTimestamp < reader.getLastTimestamp()) { - lastTimestamp = reader.getLastTimestamp(); - } - } - return lastTimestamp; - } - - @Override - public long getFirstTimestamp() { - long firstTimestamp = 0; - for (GenericEntityStreamReader reader : readers) { - if (firstTimestamp > reader.getLastTimestamp() || firstTimestamp == 0) { - firstTimestamp = reader.getLastTimestamp(); - } - } - return firstTimestamp; - } +public class GenericEntityStreamReaderMT extends StreamReader { + private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReaderMT.class); + private List<GenericEntityStreamReader> readers = new ArrayList<GenericEntityStreamReader>(); + + public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition, int numThreads) + throws Exception { + checkIsTimeSeries(serviceName); + checkNumThreads(numThreads); + long queryStartTime = condition.getStartTime(); + long queryEndTime = condition.getEndTime(); + long subStartTime = queryStartTime; + long subEndTime = 0; + long interval = (queryEndTime - queryStartTime) / numThreads; + for (int i = 0; i < numThreads; i++) { + // split search condition by time range + subStartTime = queryStartTime + i * interval; + if (i == numThreads - 1) { + subEndTime = queryEndTime; + } else { + subEndTime = subStartTime + interval; + } + // String strStartTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subStartTime); + // String strEndTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subEndTime); + SearchCondition sc = new SearchCondition(condition); + sc.setStartTime(subStartTime); + sc.setEndTime(subEndTime); + GenericEntityStreamReader reader = new GenericEntityStreamReader(serviceName, sc); + readers.add(reader); + } + } + + private void checkIsTimeSeries(String serviceName) throws Exception { + EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName); + if (!ed.isTimeSeries()) { + throw new IllegalArgumentException("Multi-threading stream reader must be applied to time series table"); + } + } + + private void checkNumThreads(int numThreads) { + if (numThreads <= 0) { + throw new IllegalArgumentException("Multi-threading stream reader must have numThreads >= 1"); + } + } + + /** + * default to 2 threads + * + * @param serviceName + * @param condition + */ + public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition) throws Exception { + this(serviceName, condition, 2); + } + + @Override + public void readAsStream() throws Exception { + // populate listeners to all readers + for (EntityCreationListener l : listeners) { + for (GenericEntityStreamReader r : readers) { + r.register(l); + } + } + + List<Future<Void>> futures = new ArrayList<Future<Void>>(); + for (GenericEntityStreamReader r : readers) { + SingleReader reader = new SingleReader(r); + Future<Void> readFuture = EagleConfigFactory.load().getExecutor().submit(reader); + futures.add(readFuture); + } + + // join threads and check exceptions + for (Future<Void> future : futures) { + try { + future.get(); + } catch (Exception ex) { + LOG.error("Error in read", ex); + throw ex; + } + } + } + + private static class SingleReader implements Callable<Void> { + private GenericEntityStreamReader reader; + + public SingleReader(GenericEntityStreamReader reader) { + this.reader = reader; + } + + @Override + public Void call() throws Exception { + reader.readAsStream(); + return null; + } + } + + @Override + public long getLastTimestamp() { + long lastTimestamp = 0; + for (GenericEntityStreamReader reader : readers) { + if (lastTimestamp < reader.getLastTimestamp()) { + lastTimestamp = reader.getLastTimestamp(); + } + } + return lastTimestamp; + } + + @Override + public long getFirstTimestamp() { + long firstTimestamp = 0; + for (GenericEntityStreamReader reader : readers) { + if (firstTimestamp > reader.getLastTimestamp() || firstTimestamp == 0) { + firstTimestamp = reader.getLastTimestamp(); + } + } + return firstTimestamp; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java index 5c8b12d..926fcba 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java @@ -27,52 +27,53 @@ import java.util.ArrayList; import java.util.List; public class GenericEntityWriter { - private static final Logger LOG = LoggerFactory.getLogger(GenericEntityWriter.class); - private EntityDefinition entityDef; + private static final Logger LOG = LoggerFactory.getLogger(GenericEntityWriter.class); + private EntityDefinition entityDef; - public GenericEntityWriter(String serviceName) throws InstantiationException, IllegalAccessException{ - this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName); - checkNotNull(entityDef, "serviceName"); - } + public GenericEntityWriter(String serviceName) throws InstantiationException, IllegalAccessException { + this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName); + checkNotNull(entityDef, "serviceName"); + } - public GenericEntityWriter(EntityDefinition entityDef) throws InstantiationException, IllegalAccessException{ - this.entityDef = entityDef; - checkNotNull(entityDef, "serviceName"); - } - - private void checkNotNull(Object o, String message) { - if(o == null){ - throw new IllegalArgumentException(message + " should not be null"); - } - } + public GenericEntityWriter(EntityDefinition entityDef) + throws InstantiationException, IllegalAccessException { + this.entityDef = entityDef; + checkNotNull(entityDef, "serviceName"); + } - /** - * @param entities - * @return row keys - * @throws Exception - */ - public List<String> write(List<? extends TaggedLogAPIEntity> entities) throws Exception{ - HBaseLogWriter writer = new HBaseLogWriter(entityDef.getTable(), entityDef.getColumnFamily()); - List<String> rowkeys = new ArrayList<String>(entities.size()); - List<InternalLog> logs = new ArrayList<InternalLog>(entities.size()); - - try{ - writer.open(); - for(TaggedLogAPIEntity entity : entities){ - final InternalLog entityLog = HBaseInternalLogHelper.convertToInternalLog(entity, entityDef); - logs.add(entityLog); - } - List<byte[]> bRowkeys = writer.write(logs); - for (byte[] rowkey : bRowkeys) { - rowkeys.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey)); - } + private void checkNotNull(Object o, String message) { + if (o == null) { + throw new IllegalArgumentException(message + " should not be null"); + } + } - }catch(Exception ex){ - LOG.error("fail writing tagged log", ex); - throw ex; - }finally{ - writer.close(); - } - return rowkeys; - } + /** + * @param entities + * @return row keys + * @throws Exception + */ + public List<String> write(List<? extends TaggedLogAPIEntity> entities) throws Exception { + HBaseLogWriter writer = new HBaseLogWriter(entityDef.getTable(), entityDef.getColumnFamily()); + List<String> rowkeys = new ArrayList<String>(entities.size()); + List<InternalLog> logs = new ArrayList<InternalLog>(entities.size()); + + try { + writer.open(); + for (TaggedLogAPIEntity entity : entities) { + final InternalLog entityLog = HBaseInternalLogHelper.convertToInternalLog(entity, entityDef); + logs.add(entityLog); + } + List<byte[]> bRowkeys = writer.write(logs); + for (byte[] rowkey : bRowkeys) { + rowkeys.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey)); + } + + } catch (Exception ex) { + LOG.error("fail writing tagged log", ex); + throw ex; + } finally { + writer.close(); + } + return rowkeys; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java index 9f6937b..56cd453 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java @@ -21,33 +21,35 @@ import org.apache.eagle.log.entity.meta.*; import com.fasterxml.jackson.databind.annotation.JsonSerialize; /** - * GenericMetricEntity should use prefix field which is extended from TaggedLogAPIEntity as metric name - * metric name is used to partition the metric tables + * GenericMetricEntity should use prefix field which is extended from TaggedLogAPIEntity as metric name metric + * name is used to partition the metric tables */ -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("eagle_metric") @ColumnFamily("f") @Prefix(GenericMetricEntity.GENERIC_METRIC_PREFIX_PLACE_HOLDER) @Service(GenericMetricEntity.GENERIC_METRIC_SERVICE) @TimeSeries(true) -@Metric(interval=60000) +@Metric(interval = 60000) @ServicePath(path = "/metric") // TODO: -@Tags({"site","application","policyId","alertExecutorId", "streamName","source","partitionSeq"}) +@Tags({ + "site", "application", "policyId", "alertExecutorId", "streamName", "source", "partitionSeq" + }) public class GenericMetricEntity extends TaggedLogAPIEntity { - public static final String GENERIC_METRIC_SERVICE = "GenericMetricService"; - public static final String GENERIC_METRIC_PREFIX_PLACE_HOLDER = "GENERIC_METRIC_PREFIX_PLACEHODLER"; - public static final String VALUE_FIELD ="value"; + public static final String GENERIC_METRIC_SERVICE = "GenericMetricService"; + public static final String GENERIC_METRIC_PREFIX_PLACE_HOLDER = "GENERIC_METRIC_PREFIX_PLACEHODLER"; + public static final String VALUE_FIELD = "value"; - @Column("a") - private double[] value; + @Column("a") + private double[] value; - public double[] getValue() { - return value; - } + public double[] getValue() { + return value; + } - public void setValue(double[] value) { - this.value = value; - pcs.firePropertyChange("value", null, null); - } -} \ No newline at end of file + public void setValue(double[] value) { + this.value = value; + pcs.firePropertyChange("value", null, null); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java index 84b02ae..bc99a81 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java @@ -23,32 +23,37 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; -public class GenericMetricEntityBatchReader implements EntityCreationListener{ - private static final Logger LOG = LoggerFactory.getLogger(GenericEntityBatchReader.class); - - private List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>(); - private GenericEntityStreamReader reader; - - public GenericMetricEntityBatchReader(String metricName, SearchCondition condition) throws Exception{ - reader = new GenericEntityStreamReader(GenericMetricEntity.GENERIC_METRIC_SERVICE, condition, metricName); - } - - public long getLastTimestamp() { - return reader.getLastTimestamp(); - } - public long getFirstTimestamp() { - return reader.getFirstTimestamp(); - } - @Override - public void entityCreated(TaggedLogAPIEntity entity){ - entities.add(entity); - } - - @SuppressWarnings("unchecked") - public <T> List<T> read() throws Exception{ - if(LOG.isDebugEnabled()) LOG.debug("Start reading as batch mode"); - reader.register(this); - reader.readAsStream(); - return (List<T>)entities; - } +public class GenericMetricEntityBatchReader implements EntityCreationListener { + private static final Logger LOG = LoggerFactory.getLogger(GenericEntityBatchReader.class); + + private List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>(); + private GenericEntityStreamReader reader; + + public GenericMetricEntityBatchReader(String metricName, SearchCondition condition) throws Exception { + reader = new GenericEntityStreamReader(GenericMetricEntity.GENERIC_METRIC_SERVICE, condition, + metricName); + } + + public long getLastTimestamp() { + return reader.getLastTimestamp(); + } + + public long getFirstTimestamp() { + return reader.getFirstTimestamp(); + } + + @Override + public void entityCreated(TaggedLogAPIEntity entity) { + entities.add(entity); + } + + @SuppressWarnings("unchecked") + public <T> List<T> read() throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("Start reading as batch mode"); + } + reader.register(this); + reader.readAsStream(); + return (List<T>)entities; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java index 1cf3905..216022f 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java @@ -25,74 +25,79 @@ import org.slf4j.LoggerFactory; import java.text.ParseException; -public class GenericMetricEntityDecompactionStreamReader extends StreamReader implements EntityCreationListener{ - @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory.getLogger(GenericMetricEntityDecompactionStreamReader.class); - private GenericEntityStreamReader reader; - private EntityDefinition ed; - private String serviceName = GenericMetricEntity.GENERIC_METRIC_SERVICE; - private long start; - private long end; - private GenericMetricShadowEntity single = new GenericMetricShadowEntity(); - - /** - * it makes sense that serviceName should not be provided while metric name should be provided as prefix - * @param metricName - * @param condition - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ParseException - */ - public GenericMetricEntityDecompactionStreamReader(String metricName, SearchCondition condition) throws InstantiationException, IllegalAccessException, ParseException{ - ed = EntityDefinitionManager.getEntityByServiceName(serviceName); - checkIsMetric(ed); - reader = new GenericEntityStreamReader(serviceName, condition, metricName); - start = condition.getStartTime(); - end = condition.getEndTime(); - } - - private void checkIsMetric(EntityDefinition ed){ - if(ed.getMetricDefinition() == null) - throw new IllegalArgumentException("Only metric entity comes here"); - } - - @Override - public void entityCreated(TaggedLogAPIEntity entity) throws Exception{ - GenericMetricEntity e = (GenericMetricEntity)entity; - double[] value = e.getValue(); - if(value != null) { - int count =value.length; - @SuppressWarnings("unused") - Class<?> cls = ed.getMetricDefinition().getSingleTimestampEntityClass(); - for (int i = 0; i < count; i++) { - long ts = entity.getTimestamp() + i * ed.getMetricDefinition().getInterval(); - // exclude those entity which is not within the time range in search condition. [start, end) - if (ts < start || ts >= end) { - continue; - } - single.setTimestamp(ts); - single.setTags(entity.getTags()); - single.setValue(e.getValue()[i]); - for (EntityCreationListener l : _listeners) { - l.entityCreated(single); - } - } - } - } - - @Override - public void readAsStream() throws Exception{ - reader.register(this); - reader.readAsStream(); - } +public class GenericMetricEntityDecompactionStreamReader extends StreamReader + implements EntityCreationListener { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory + .getLogger(GenericMetricEntityDecompactionStreamReader.class); + private GenericEntityStreamReader reader; + private EntityDefinition ed; + private String serviceName = GenericMetricEntity.GENERIC_METRIC_SERVICE; + private long start; + private long end; + private GenericMetricShadowEntity single = new GenericMetricShadowEntity(); - @Override - public long getLastTimestamp() { - return reader.getLastTimestamp(); - } + /** + * it makes sense that serviceName should not be provided while metric name should be provided as prefix + * + * @param metricName + * @param condition + * @throws InstantiationException + * @throws IllegalAccessException + * @throws ParseException + */ + public GenericMetricEntityDecompactionStreamReader(String metricName, SearchCondition condition) + throws InstantiationException, IllegalAccessException, ParseException { + ed = EntityDefinitionManager.getEntityByServiceName(serviceName); + checkIsMetric(ed); + reader = new GenericEntityStreamReader(serviceName, condition, metricName); + start = condition.getStartTime(); + end = condition.getEndTime(); + } - @Override - public long getFirstTimestamp() { - return reader.getFirstTimestamp(); - } -} \ No newline at end of file + private void checkIsMetric(EntityDefinition ed) { + if (ed.getMetricDefinition() == null) { + throw new IllegalArgumentException("Only metric entity comes here"); + } + } + + @Override + public void entityCreated(TaggedLogAPIEntity entity) throws Exception { + GenericMetricEntity e = (GenericMetricEntity)entity; + double[] value = e.getValue(); + if (value != null) { + int count = value.length; + @SuppressWarnings("unused") + Class<?> cls = ed.getMetricDefinition().getSingleTimestampEntityClass(); + for (int i = 0; i < count; i++) { + long ts = entity.getTimestamp() + i * ed.getMetricDefinition().getInterval(); + // exclude those entity which is not within the time range in search condition. [start, end) + if (ts < start || ts >= end) { + continue; + } + single.setTimestamp(ts); + single.setTags(entity.getTags()); + single.setValue(e.getValue()[i]); + for (EntityCreationListener l : listeners) { + l.entityCreated(single); + } + } + } + } + + @Override + public void readAsStream() throws Exception { + reader.register(this); + reader.readAsStream(); + } + + @Override + public long getLastTimestamp() { + return reader.getLastTimestamp(); + } + + @Override + public long getFirstTimestamp() { + return reader.getFirstTimestamp(); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java index acd1290..8ead7cd 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java @@ -22,13 +22,13 @@ import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; * just a shadow class to avoid dynamically create the class and instantiate using reflection */ public class GenericMetricShadowEntity extends TaggedLogAPIEntity { - private double value; + private double value; - public double getValue() { - return value; - } + public double getValue() { + return value; + } - public void setValue(double value) { - this.value = value; - } + public void setValue(double value) { + this.value = value; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java index 6869c7c..97f538c 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java @@ -35,24 +35,27 @@ import java.util.Map; */ @XmlRootElement @XmlAccessorType(XmlAccessType.FIELD) -@XmlType(propOrder = {"success","exception","meta","type","obj"}) -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@XmlType(propOrder = { + "success", "exception", "meta", "type", "obj" + }) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @JsonDeserialize(using = GenericServiceAPIResponseEntityDeserializer.class) -@JsonIgnoreProperties(ignoreUnknown=true) -public class GenericServiceAPIResponseEntity<T>{ +@JsonIgnoreProperties(ignoreUnknown = true) +public class GenericServiceAPIResponseEntity<T> { /** * Please use primitive type of value in meta as possible */ - private Map<String,Object> meta; - private boolean success; - private String exception; + private Map<String, Object> meta; + private boolean success; + private String exception; private List<T> obj; private Class<T> type; - public GenericServiceAPIResponseEntity(){ + public GenericServiceAPIResponseEntity() { // default constructor } - public GenericServiceAPIResponseEntity(Class<T> type){ + + public GenericServiceAPIResponseEntity(Class<T> type) { this.setType(type); } @@ -72,7 +75,7 @@ public class GenericServiceAPIResponseEntity<T>{ this.obj = obj; } - public void setObj(List<T> obj,Class<T> type) { + public void setObj(List<T> obj, Class<T> type) { this.setObj(obj); this.setType(type); } @@ -85,10 +88,10 @@ public class GenericServiceAPIResponseEntity<T>{ * Set the first object's class as type */ @SuppressWarnings("unused") - public void setTypeByObj(){ - for(T t:this.obj){ - if(this.type == null && t!=null){ - this.type = (Class<T>) t.getClass(); + public void setTypeByObj() { + for (T t : this.obj) { + if (this.type == null && t != null) { + this.type = (Class<T>)t.getClass(); } } } @@ -102,17 +105,19 @@ public class GenericServiceAPIResponseEntity<T>{ this.type = type; } - public boolean isSuccess() { - return success; - } - public void setSuccess(boolean success) { - this.success = success; - } - public String getException() { - return exception; - } - - public void setException(Exception exceptionObj){ + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public String getException() { + return exception; + } + + public void setException(Exception exceptionObj) { this.exception = EagleExceptionWrapper.wrap(exceptionObj); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java index 836295b..8ccb43a 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java @@ -30,57 +30,61 @@ import java.io.IOException; import java.util.*; /** - * @since 3/18/15 + * @since 3/18/15. */ -public class GenericServiceAPIResponseEntityDeserializer extends JsonDeserializer<GenericServiceAPIResponseEntity> { - private final static String META_FIELD="meta"; - private final static String SUCCESS_FIELD="success"; - private final static String EXCEPTION_FIELD="exception"; - private final static String OBJ_FIELD="obj"; - private final static String TYPE_FIELD="type"; +public class GenericServiceAPIResponseEntityDeserializer + extends JsonDeserializer<GenericServiceAPIResponseEntity> { + private static final String META_FIELD = "meta"; + private static final String SUCCESS_FIELD = "success"; + private static final String EXCEPTION_FIELD = "exception"; + private static final String OBJ_FIELD = "obj"; + private static final String TYPE_FIELD = "type"; @Override - public GenericServiceAPIResponseEntity deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException { + public GenericServiceAPIResponseEntity deserialize(JsonParser jp, DeserializationContext ctxt) + throws IOException, JsonProcessingException { GenericServiceAPIResponseEntity entity = new GenericServiceAPIResponseEntity(); ObjectCodec objectCodec = jp.getCodec(); JsonNode rootNode = jp.getCodec().readTree(jp); - if(rootNode.isObject()){ - Iterator<Map.Entry<String,JsonNode>> fields = rootNode.fields(); + if (rootNode.isObject()) { + Iterator<Map.Entry<String, JsonNode>> fields = rootNode.fields(); JsonNode objNode = null; - while(fields.hasNext()){ - Map.Entry<String,JsonNode> field = fields.next(); - if (META_FIELD.equals(field.getKey()) && field.getValue() != null) + while (fields.hasNext()) { + Map.Entry<String, JsonNode> field = fields.next(); + if (META_FIELD.equals(field.getKey()) && field.getValue() != null) { entity.setMeta(objectCodec.readValue(field.getValue().traverse(), Map.class)); - else if(SUCCESS_FIELD.equals(field.getKey()) && field.getValue() != null){ + } else if (SUCCESS_FIELD.equals(field.getKey()) && field.getValue() != null) { entity.setSuccess(field.getValue().booleanValue()); - }else if(EXCEPTION_FIELD.equals(field.getKey()) && field.getValue() != null){ + } else if (EXCEPTION_FIELD.equals(field.getKey()) && field.getValue() != null) { entity.setException(new Exception(field.getValue().textValue())); - }else if(TYPE_FIELD.endsWith(field.getKey()) && field.getValue() != null){ - Preconditions.checkNotNull(field.getValue().textValue(),"Response type class is null"); + } else if (TYPE_FIELD.endsWith(field.getKey()) && field.getValue() != null) { + Preconditions.checkNotNull(field.getValue().textValue(), "Response type class is null"); try { entity.setType(Class.forName(field.getValue().textValue())); } catch (ClassNotFoundException e) { throw new IOException(e); } - }else if(OBJ_FIELD.equals(field.getKey()) && field.getValue() != null){ + } else if (OBJ_FIELD.equals(field.getKey()) && field.getValue() != null) { objNode = field.getValue(); } } - if(objNode!=null) { - JavaType collectionType=null; + if (objNode != null) { + JavaType collectionType = null; if (entity.getType() != null) { - collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, entity.getType()); - }else{ - collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, Map.class); + collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, + entity.getType()); + } else { + collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, + Map.class); } List obj = objectCodec.readValue(objNode.traverse(), collectionType); entity.setObj(obj); } - }else{ + } else { throw new IOException("root node is not object"); } return entity; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java index 7a38033..32f382b 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java @@ -30,216 +30,232 @@ import org.slf4j.LoggerFactory; import java.util.*; public class HBaseInternalLogHelper { - private final static Logger LOG = LoggerFactory.getLogger(HBaseInternalLogHelper.class); - - private static final EntitySerDeserializer ENTITY_SERDESER = new EntitySerDeserializer(); - - /** - * - * @param ed - * @param r - * @param qualifiers if null, return all qualifiers defined in ed - * @return - */ - public static InternalLog parse(EntityDefinition ed, Result r, byte[][] qualifiers) { - final byte[] row = r.getRow(); - // skip the first 4 bytes : prefix - final int offset = (ed.getPartitions() == null) ? (4) : (4 + ed.getPartitions().length * 4); - long timestamp = ByteUtil.bytesToLong(row, offset); - // reverse timestamp - timestamp = Long.MAX_VALUE - timestamp; - final byte[] family = ed.getColumnFamily().getBytes(); - final Map<String, byte[]> allQualifierValues = new HashMap<String, byte[]>(); - - if (qualifiers != null) { - int count = qualifiers.length; - final byte[][] values = new byte[count][]; - for (int i = 0; i < count; i++) { - // TODO if returned value is null, it means no this column for this row, so why set null to the object? - values[i] = r.getValue(family, qualifiers[i]); - allQualifierValues.put(new String(qualifiers[i]), values[i]); - } - }else{ - // return all qualifiers - for(KeyValue kv:r.list()){ - byte[] qualifier = kv.getQualifier(); - byte[] value = kv.getValue(); - allQualifierValues.put(new String(qualifier),value); - } - } - final InternalLog log = buildObject(ed, row, timestamp, allQualifierValues); - return log; - } - - /** - * - * @param ed - * @param row - * @param timestamp - * @param allQualifierValues <code>Map < Qualifier name (not display name),Value in bytes array ></code> - * @return - */ - public static InternalLog buildObject(EntityDefinition ed, byte[] row, long timestamp, Map<String, byte[]> allQualifierValues) { - InternalLog log = new InternalLog(); - String myRow = EagleBase64Wrapper.encodeByteArray2URLSafeString(row); - log.setEncodedRowkey(myRow); - log.setPrefix(ed.getPrefix()); - log.setTimestamp(timestamp); - - Map<String, byte[]> logQualifierValues = new HashMap<String, byte[]>(); - Map<String, String> logTags = new HashMap<String, String>(); - Map<String, Object> extra = null; - - Map<String,Double> doubleMap = null; - // handle with metric - boolean isMetricEntity = GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(ed.getService()); - double[] metricValueArray = null; - - for (Map.Entry<String, byte[]> entry : allQualifierValues.entrySet()) { - if (ed.isTag(entry.getKey())) { - if (entry.getValue() != null) { - logTags.put(entry.getKey(), new String(entry.getValue())); - }else if (TokenConstant.isExpression(entry.getKey())){ - if(doubleMap == null) doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(allQualifierValues, ed); - // Caculate expression based fields - String expression = TokenConstant.parseExpressionContent(entry.getKey()); - if (extra == null) extra = new HashMap<String, Object>(); - - // Evaluation expression as output based on entity - // ----------------------------------------------- - // 1) Firstly, check whether is metric entity and expression requires value and also value is not number (i.e. double[]) - // 2) Treat all required fields as double, if not number, then set result as NaN - - try { - ExpressionParser parser = ExpressionParser.parse(expression); - boolean isRequiringValue = parser.getDependentFields().contains(GenericMetricEntity.VALUE_FIELD); - - if(isMetricEntity && isRequiringValue && doubleMap.get(GenericMetricEntity.VALUE_FIELD)!=null - && Double.isNaN(doubleMap.get(GenericMetricEntity.VALUE_FIELD))) // EntityQualifierUtils will convert non-number field into Double.NaN - { - // if dependent fields require "value" - // and value exists but value's type is double[] instead of double - - // handle with metric value array based expression - // lazily extract metric value as double array if required - if(metricValueArray == null){ - // if(allQualifierValues.containsKey(GenericMetricEntity.VALUE_FIELD)){ - Qualifier qualifier = ed.getDisplayNameMap().get(GenericMetricEntity.VALUE_FIELD); - EntitySerDeser serDeser = qualifier.getSerDeser(); - if(serDeser instanceof DoubleArraySerDeser){ - byte[] value = allQualifierValues.get(qualifier.getQualifierName()); - if(value !=null ) metricValueArray = (double[]) serDeser.deserialize(value); - } - // } - } - - if(metricValueArray!=null){ - double[] resultBucket = new double[metricValueArray.length]; - Map<String, Double> _doubleMap = new HashMap<String,Double>(doubleMap); - _doubleMap.remove(entry.getKey()); - for(int i=0;i< resultBucket.length;i++) { - _doubleMap.put(GenericMetricEntity.VALUE_FIELD, metricValueArray[i]); - resultBucket[i]= parser.eval(_doubleMap); - } - extra.put(expression,resultBucket); - }else{ - LOG.warn("Failed convert metric value into double[] type which is required by expression: "+expression); - // if require value in double[] is NaN - double value = parser.eval(doubleMap); - extra.put(expression, value); - } - }else { - double value = parser.eval(doubleMap); - extra.put(expression, value); - // LOG.info("DEBUG: "+entry.getKey()+" = "+ value); - } - } catch (Exception e) { - LOG.error("Failed to eval expression "+expression+", exception: "+e.getMessage(),e); - } - } - } else { - logQualifierValues.put(entry.getKey(),entry.getValue()); - } - } - log.setQualifierValues(logQualifierValues); - log.setTags(logTags); - log.setExtraValues(extra); - return log; - } - - public static TaggedLogAPIEntity buildEntity(InternalLog log, EntityDefinition entityDef) throws Exception { - Map<String, byte[]> qualifierValues = log.getQualifierValues(); - TaggedLogAPIEntity entity = ENTITY_SERDESER.readValue(qualifierValues, entityDef); - if (entity.getTags() == null && log.getTags() != null) { - entity.setTags(log.getTags()); - } - entity.setExp(log.getExtraValues()); - entity.setTimestamp(log.getTimestamp()); - entity.setEncodedRowkey(log.getEncodedRowkey()); - entity.setPrefix(log.getPrefix()); - return entity; - } - - public static List<TaggedLogAPIEntity> buildEntities(List<InternalLog> logs, EntityDefinition entityDef) throws Exception { - final List<TaggedLogAPIEntity> result = new ArrayList<TaggedLogAPIEntity>(logs.size()); - for (InternalLog log : logs) { - result.add(buildEntity(log, entityDef)); - } - return result; - } - - public static byte[][] getOutputQualifiers(EntityDefinition entityDef, List<String> outputFields) { - final byte[][] result = new byte[outputFields.size()][]; - int index = 0; - for(String field : outputFields){ - // convert displayName to qualifierName - Qualifier q = entityDef.getDisplayNameMap().get(field); - if(q == null){ // for tag case - result[index++] = field.getBytes(); - }else{ // for qualifier case - result[index++] = q.getQualifierName().getBytes(); - } - } - return result; - } - - public static InternalLog convertToInternalLog(TaggedLogAPIEntity entity, EntityDefinition entityDef) throws Exception { - final InternalLog log = new InternalLog(); - final Map<String, String> inputTags = entity.getTags(); - final Map<String, String> tags = new TreeMap<String, String>(); - if(inputTags!=null) { - for (Map.Entry<String, String> entry : inputTags.entrySet()) { - tags.put(entry.getKey(), entry.getValue()); - } - } - log.setTags(tags); - if(entityDef.isTimeSeries()){ - log.setTimestamp(entity.getTimestamp()); - }else{ - log.setTimestamp(EntityConstants.FIXED_WRITE_TIMESTAMP); // set timestamp to MAX, then actually stored 0 - } - - // For Metric entity, prefix is populated along with entity instead of EntityDefinition - if(entity.getPrefix() != null && !entity.getPrefix().isEmpty()){ - log.setPrefix(entity.getPrefix()); - }else{ - log.setPrefix(entityDef.getPrefix()); - } - - log.setPartitions(entityDef.getPartitions()); - EntitySerDeserializer des = new EntitySerDeserializer(); - log.setQualifierValues(des.writeValue(entity, entityDef)); - - final IndexDefinition[] indexDefs = entityDef.getIndexes(); - if (indexDefs != null) { - final List<byte[]> indexRowkeys = new ArrayList<byte[]>(); - for (int i = 0; i < indexDefs.length; ++i) { - final IndexDefinition indexDef = indexDefs[i]; - final byte[] indexRowkey = indexDef.generateIndexRowkey(entity); - indexRowkeys.add(indexRowkey); - } - log.setIndexRowkeys(indexRowkeys); - } - return log; - } + private static final Logger LOG = LoggerFactory.getLogger(HBaseInternalLogHelper.class); + + private static final EntitySerDeserializer ENTITY_SERDESER = new EntitySerDeserializer(); + + /** + * @param ed + * @param r + * @param qualifiers if null, return all qualifiers defined in ed + * @return + */ + public static InternalLog parse(EntityDefinition ed, Result r, byte[][] qualifiers) { + final byte[] row = r.getRow(); + // skip the first 4 bytes : prefix + final int offset = (ed.getPartitions() == null) ? (4) : (4 + ed.getPartitions().length * 4); + long timestamp = ByteUtil.bytesToLong(row, offset); + // reverse timestamp + timestamp = Long.MAX_VALUE - timestamp; + final byte[] family = ed.getColumnFamily().getBytes(); + final Map<String, byte[]> allQualifierValues = new HashMap<String, byte[]>(); + + if (qualifiers != null) { + int count = qualifiers.length; + final byte[][] values = new byte[count][]; + for (int i = 0; i < count; i++) { + // TODO if returned value is null, it means no this column for this row, so why set null to + // the object? + values[i] = r.getValue(family, qualifiers[i]); + allQualifierValues.put(new String(qualifiers[i]), values[i]); + } + } else { + // return all qualifiers + for (KeyValue kv : r.list()) { + byte[] qualifier = kv.getQualifier(); + byte[] value = kv.getValue(); + allQualifierValues.put(new String(qualifier), value); + } + } + final InternalLog log = buildObject(ed, row, timestamp, allQualifierValues); + return log; + } + + /** + * @param ed + * @param row + * @param timestamp + * @param allQualifierValues + * <code>Map < Qualifier name (not display name),Value in bytes array ></code> + * @return + */ + public static InternalLog buildObject(EntityDefinition ed, byte[] row, long timestamp, + Map<String, byte[]> allQualifierValues) { + InternalLog log = new InternalLog(); + String myRow = EagleBase64Wrapper.encodeByteArray2URLSafeString(row); + log.setEncodedRowkey(myRow); + log.setPrefix(ed.getPrefix()); + log.setTimestamp(timestamp); + + Map<String, byte[]> logQualifierValues = new HashMap<String, byte[]>(); + Map<String, String> logTags = new HashMap<String, String>(); + Map<String, Object> extra = null; + + Map<String, Double> doubleMap = null; + // handle with metric + boolean isMetricEntity = GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(ed.getService()); + double[] metricValueArray = null; + + for (Map.Entry<String, byte[]> entry : allQualifierValues.entrySet()) { + if (ed.isTag(entry.getKey())) { + if (entry.getValue() != null) { + logTags.put(entry.getKey(), new String(entry.getValue())); + } else if (TokenConstant.isExpression(entry.getKey())) { + if (doubleMap == null) { + doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(allQualifierValues, ed); + } + // Caculate expression based fields + String expression = TokenConstant.parseExpressionContent(entry.getKey()); + if (extra == null) { + extra = new HashMap<String, Object>(); + } + + // Evaluation expression as output based on entity + // ----------------------------------------------- + // 1) Firstly, check whether is metric entity and expression requires value and also value + // is not number (i.e. double[]) + // 2) Treat all required fields as double, if not number, then set result as NaN + + try { + ExpressionParser parser = ExpressionParser.parse(expression); + boolean isRequiringValue = parser.getDependentFields() + .contains(GenericMetricEntity.VALUE_FIELD); + + if (isMetricEntity && isRequiringValue + && doubleMap.get(GenericMetricEntity.VALUE_FIELD) != null + && Double.isNaN(doubleMap.get(GenericMetricEntity.VALUE_FIELD))) { + // EntityQualifierUtils will convert non-number field into Double.NaN + // if dependent fields require "value" + // and value exists but value's type is double[] instead of double + + // handle with metric value array based expression + // lazily extract metric value as double array if required + if (metricValueArray == null) { + // if(allQualifierValues.containsKey(GenericMetricEntity.VALUE_FIELD)){ + Qualifier qualifier = ed.getDisplayNameMap() + .get(GenericMetricEntity.VALUE_FIELD); + EntitySerDeser serDeser = qualifier.getSerDeser(); + if (serDeser instanceof DoubleArraySerDeser) { + byte[] value = allQualifierValues.get(qualifier.getQualifierName()); + if (value != null) { + metricValueArray = (double[])serDeser.deserialize(value); + } + } + // } + } + + if (metricValueArray != null) { + double[] resultBucket = new double[metricValueArray.length]; + Map<String, Double> _doubleMap = new HashMap<String, Double>(doubleMap); + _doubleMap.remove(entry.getKey()); + for (int i = 0; i < resultBucket.length; i++) { + _doubleMap.put(GenericMetricEntity.VALUE_FIELD, metricValueArray[i]); + resultBucket[i] = parser.eval(_doubleMap); + } + extra.put(expression, resultBucket); + } else { + LOG.warn("Failed convert metric value into double[] type which is required by expression: " + + expression); + // if require value in double[] is NaN + double value = parser.eval(doubleMap); + extra.put(expression, value); + } + } else { + double value = parser.eval(doubleMap); + extra.put(expression, value); + // LOG.info("DEBUG: "+entry.getKey()+" = "+ value); + } + } catch (Exception e) { + LOG.error("Failed to eval expression " + expression + ", exception: " + + e.getMessage(), e); + } + } + } else { + logQualifierValues.put(entry.getKey(), entry.getValue()); + } + } + log.setQualifierValues(logQualifierValues); + log.setTags(logTags); + log.setExtraValues(extra); + return log; + } + + public static TaggedLogAPIEntity buildEntity(InternalLog log, EntityDefinition entityDef) + throws Exception { + Map<String, byte[]> qualifierValues = log.getQualifierValues(); + TaggedLogAPIEntity entity = ENTITY_SERDESER.readValue(qualifierValues, entityDef); + if (entity.getTags() == null && log.getTags() != null) { + entity.setTags(log.getTags()); + } + entity.setExp(log.getExtraValues()); + entity.setTimestamp(log.getTimestamp()); + entity.setEncodedRowkey(log.getEncodedRowkey()); + entity.setPrefix(log.getPrefix()); + return entity; + } + + public static List<TaggedLogAPIEntity> buildEntities(List<InternalLog> logs, EntityDefinition entityDef) + throws Exception { + final List<TaggedLogAPIEntity> result = new ArrayList<TaggedLogAPIEntity>(logs.size()); + for (InternalLog log : logs) { + result.add(buildEntity(log, entityDef)); + } + return result; + } + + public static byte[][] getOutputQualifiers(EntityDefinition entityDef, List<String> outputFields) { + final byte[][] result = new byte[outputFields.size()][]; + int index = 0; + for (String field : outputFields) { + // convert displayName to qualifierName + Qualifier q = entityDef.getDisplayNameMap().get(field); + if (q == null) { // for tag case + result[index++] = field.getBytes(); + } else { // for qualifier case + result[index++] = q.getQualifierName().getBytes(); + } + } + return result; + } + + public static InternalLog convertToInternalLog(TaggedLogAPIEntity entity, EntityDefinition entityDef) + throws Exception { + final InternalLog log = new InternalLog(); + final Map<String, String> inputTags = entity.getTags(); + final Map<String, String> tags = new TreeMap<String, String>(); + if (inputTags != null) { + for (Map.Entry<String, String> entry : inputTags.entrySet()) { + tags.put(entry.getKey(), entry.getValue()); + } + } + log.setTags(tags); + if (entityDef.isTimeSeries()) { + log.setTimestamp(entity.getTimestamp()); + } else { + log.setTimestamp(EntityConstants.FIXED_WRITE_TIMESTAMP); // set timestamp to MAX, then actually stored 0 + } + + // For Metric entity, prefix is populated along with entity instead of EntityDefinition + if (entity.getPrefix() != null && !entity.getPrefix().isEmpty()) { + log.setPrefix(entity.getPrefix()); + } else { + log.setPrefix(entityDef.getPrefix()); + } + + log.setPartitions(entityDef.getPartitions()); + EntitySerDeserializer des = new EntitySerDeserializer(); + log.setQualifierValues(des.writeValue(entity, entityDef)); + + final IndexDefinition[] indexDefs = entityDef.getIndexes(); + if (indexDefs != null) { + final List<byte[]> indexRowkeys = new ArrayList<byte[]>(); + for (int i = 0; i < indexDefs.length; ++i) { + final IndexDefinition indexDef = indexDefs[i]; + final byte[] indexRowkey = indexDef.generateIndexRowkey(entity); + indexRowkeys.add(indexRowkey); + } + log.setIndexRowkeys(indexRowkeys); + } + return log; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java index c8b9a33..d5c8e2c 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java @@ -28,59 +28,62 @@ import java.util.Date; import java.util.List; public class HBaseLogReader2 extends AbstractHBaseLogReader<InternalLog> { - protected ResultScanner rs; + protected ResultScanner rs; - public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime, Filter filter, String lastScanKey, byte[][] outputQualifiers) { - super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers); - } + public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime, + Filter filter, String lastScanKey, byte[][] outputQualifiers) { + super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers); + } - /** - * This constructor supports partition. - * - * @param ed entity definition - * @param partitions partition values, which is sorted in partition definition order. TODO: in future we need to support - * multiple values for one partition field - * @param startTime start time of the query - * @param endTime end time of the query - * @param filter filter for the hbase scan - * @param lastScanKey the key of last scan - * @param outputQualifiers the bytes of output qualifier names - * @param prefix can be populated from outside world specifically for generic metric reader - */ - public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime, Filter filter, String lastScanKey, byte[][] outputQualifiers, String prefix) { - super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers, prefix); - } + /** + * This constructor supports partition. + * + * @param ed entity definition + * @param partitions partition values, which is sorted in partition definition order. TODO: in future we + * need to support multiple values for one partition field + * @param startTime start time of the query + * @param endTime end time of the query + * @param filter filter for the hbase scan + * @param lastScanKey the key of last scan + * @param outputQualifiers the bytes of output qualifier names + * @param prefix can be populated from outside world specifically for generic metric reader + */ + public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime, + Filter filter, String lastScanKey, byte[][] outputQualifiers, String prefix) { + super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers, prefix); + } - @Override - protected void onOpen(HTableInterface tbl, Scan scan) throws IOException { - rs = tbl.getScanner(scan); - } + @Override + protected void onOpen(HTableInterface tbl, Scan scan) throws IOException { + rs = tbl.getScanner(scan); + } - /** - * <h2>Close:</h2> - * 1. Call super.close(): release current table connection <br></br> - * 2. Close Scanner<br></br> - * - * @throws IOException - */ - @Override - public void close() throws IOException { - super.close(); - if(rs != null){ - rs.close(); - } - } + /** + * <h2>Close:</h2> 1. Call super.close(): release current table connection <br> + * <br> + * 2. Close Scanner<br> + * <br> + * + * @throws IOException + */ + @Override + public void close() throws IOException { + super.close(); + if (rs != null) { + rs.close(); + } + } - @Override - public InternalLog read() throws IOException { - if (rs == null) - throw new IllegalArgumentException( - "ResultScanner must be initialized before reading"); - InternalLog t = null; - Result r = rs.next(); - if (r != null) { - t = HBaseInternalLogHelper.parse(_ed, r, qualifiers); - } - return t; - } + @Override + public InternalLog read() throws IOException { + if (rs == null) { + throw new IllegalArgumentException("ResultScanner must be initialized before reading"); + } + InternalLog t = null; + Result r = rs.next(); + if (r != null) { + t = HBaseInternalLogHelper.parse(ed, r, qualifiers); + } + return t; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java index 059ee7f..1cf23b6 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java @@ -29,124 +29,125 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HBaseLogWriter implements LogWriter { - private static Logger LOG = LoggerFactory.getLogger(HBaseLogWriter.class); - private static byte[] EMPTY_INDEX_QUALIFER_VALUE = "".getBytes(); - - private HTableInterface tbl; - private String table; - private String columnFamily; - - public HBaseLogWriter(String table, String columnFamily) { - // TODO assert for non-null of table and columnFamily - this.table = table; - this.columnFamily = columnFamily; - } - - @Override - public void open() throws IOException { - try{ - tbl = EagleConfigFactory.load().getHTable(this.table); -// LOGGER.info("HBase table " + table + " audo reflush is " + (tbl.isAutoFlush() ? "enabled" : "disabled")); - }catch(Exception ex){ - LOG.error("Cannot create htable", ex); - throw new IOException(ex); - } - } - - @Override - public void close() throws IOException { - if(tbl != null){ - new HTableFactory().releaseHTableInterface(tbl); - } - } - - @Override - public void flush() throws IOException { - tbl.flushCommits(); - } - - protected void populateColumnValues(Put p, InternalLog log){ - Map<String, byte[]> qualifierValues = log.getQualifierValues(); - // iterate all qualifierValues - for(Map.Entry<String, byte[]> entry : qualifierValues.entrySet()){ - p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue()); - } - - Map<String, String> tags = log.getTags(); - // iterate all tags, each tag will be stored as a column qualifier - if(tags != null){ - for(Map.Entry<String, String> entry : tags.entrySet()){ - // TODO need a consistent handling of null values - if(entry.getValue() != null) - p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue().getBytes()); - } - } - } - - /** - * TODO need think about if multi-PUT is necessary, by checking if autoFlush works - */ - @Override - public byte[] write(InternalLog log) throws IOException{ - final byte[] rowkey = RowkeyBuilder.buildRowkey(log); - final Put p = new Put(rowkey); - populateColumnValues(p, log); - tbl.put(p); - final List<byte[]> indexRowkeys = log.getIndexRowkeys(); - if (indexRowkeys != null) { - writeIndexes(rowkey, indexRowkeys); - } - return rowkey; - } - - /** - * TODO need think about if multi-PUT is necessary, by checking if autoFlush works - */ - public List<byte[]> write(List<InternalLog> logs) throws IOException{ - final List<Put> puts = new ArrayList<Put>(logs.size()); - final List<byte[]> result = new ArrayList<byte[]>(logs.size()); - for (InternalLog log : logs) { - final byte[] rowkey = RowkeyBuilder.buildRowkey(log); - final Put p = new Put(rowkey); - populateColumnValues(p, log); - puts.add(p); - final List<byte[]> indexRowkeys = log.getIndexRowkeys(); - if (indexRowkeys != null) { - writeIndexes(rowkey, indexRowkeys, puts); - } - result.add(rowkey); - } - tbl.put(puts); - return result; - } - - @Override - public void updateByRowkey(byte[] rowkey, InternalLog log) throws IOException{ - Put p = new Put(rowkey); - populateColumnValues(p, log); - tbl.put(p); - final List<byte[]> indexRowkeys = log.getIndexRowkeys(); - if (indexRowkeys != null) { - writeIndexes(rowkey, indexRowkeys); - } - } - - private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys) throws IOException { - for (byte[] indexRowkey : indexRowkeys) { - Put p = new Put(indexRowkey); - p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE); - tbl.put(p); - } - } - - private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys, List<Put> puts) throws IOException { - for (byte[] indexRowkey : indexRowkeys) { - Put p = new Put(indexRowkey); - p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE); - puts.add(p); -// tbl.put(p); - } - } - - + private static Logger LOG = LoggerFactory.getLogger(HBaseLogWriter.class); + private static byte[] EMPTY_INDEX_QUALIFER_VALUE = "".getBytes(); + + private HTableInterface tbl; + private String table; + private String columnFamily; + + public HBaseLogWriter(String table, String columnFamily) { + // TODO assert for non-null of table and columnFamily + this.table = table; + this.columnFamily = columnFamily; + } + + @Override + public void open() throws IOException { + try { + tbl = EagleConfigFactory.load().getHTable(this.table); + // LOGGER.info("HBase table " + table + " audo reflush is " + (tbl.isAutoFlush() ? "enabled" : + // "disabled")); + } catch (Exception ex) { + LOG.error("Cannot create htable", ex); + throw new IOException(ex); + } + } + + @Override + public void close() throws IOException { + if (tbl != null) { + new HTableFactory().releaseHTableInterface(tbl); + } + } + + @Override + public void flush() throws IOException { + tbl.flushCommits(); + } + + protected void populateColumnValues(Put p, InternalLog log) { + Map<String, byte[]> qualifierValues = log.getQualifierValues(); + // iterate all qualifierValues + for (Map.Entry<String, byte[]> entry : qualifierValues.entrySet()) { + p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue()); + } + + Map<String, String> tags = log.getTags(); + // iterate all tags, each tag will be stored as a column qualifier + if (tags != null) { + for (Map.Entry<String, String> entry : tags.entrySet()) { + // TODO need a consistent handling of null values + if (entry.getValue() != null) { + p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue().getBytes()); + } + } + } + } + + /** + * TODO need think about if multi-PUT is necessary, by checking if autoFlush works + */ + @Override + public byte[] write(InternalLog log) throws IOException { + final byte[] rowkey = RowkeyBuilder.buildRowkey(log); + final Put p = new Put(rowkey); + populateColumnValues(p, log); + tbl.put(p); + final List<byte[]> indexRowkeys = log.getIndexRowkeys(); + if (indexRowkeys != null) { + writeIndexes(rowkey, indexRowkeys); + } + return rowkey; + } + + /** + * TODO need think about if multi-PUT is necessary, by checking if autoFlush works + */ + public List<byte[]> write(List<InternalLog> logs) throws IOException { + final List<Put> puts = new ArrayList<Put>(logs.size()); + final List<byte[]> result = new ArrayList<byte[]>(logs.size()); + for (InternalLog log : logs) { + final byte[] rowkey = RowkeyBuilder.buildRowkey(log); + final Put p = new Put(rowkey); + populateColumnValues(p, log); + puts.add(p); + final List<byte[]> indexRowkeys = log.getIndexRowkeys(); + if (indexRowkeys != null) { + writeIndexes(rowkey, indexRowkeys, puts); + } + result.add(rowkey); + } + tbl.put(puts); + return result; + } + + @Override + public void updateByRowkey(byte[] rowkey, InternalLog log) throws IOException { + Put p = new Put(rowkey); + populateColumnValues(p, log); + tbl.put(p); + final List<byte[]> indexRowkeys = log.getIndexRowkeys(); + if (indexRowkeys != null) { + writeIndexes(rowkey, indexRowkeys); + } + } + + private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys) throws IOException { + for (byte[] indexRowkey : indexRowkeys) { + Put p = new Put(indexRowkey); + p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE); + tbl.put(p); + } + } + + private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys, List<Put> puts) throws IOException { + for (byte[] indexRowkey : indexRowkeys) { + Put p = new Put(indexRowkey); + p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE); + puts.add(p); + // tbl.put(p); + } + } + } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java index 8276640..066401f 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java @@ -25,115 +25,134 @@ import java.util.Map; * TODO we should decouple BaseLog during write time and BaseLog during read time */ public class InternalLog { - private String encodedRowkey; - private String prefix; - private String[] partitions; - private long timestamp; - private Map<String, byte[]> qualifierValues; - - private Map<String,Object> extraValues; - private Map<String, String> tags; - private Map<String, List<String>> searchTags; - private List<byte[]> indexRowkeys; - - public String getEncodedRowkey() { - return encodedRowkey; - } - - public void setEncodedRowkey(String encodedRowkey) { - this.encodedRowkey = encodedRowkey; - } - - public Map<String, byte[]> getQualifierValues() { - return qualifierValues; - } - public void setQualifierValues(Map<String, byte[]> qualifierValues) { - this.qualifierValues = qualifierValues; - } - - public Map<String, List<String>> getSearchTags() { - return searchTags; - } - public void setSearchTags(Map<String, List<String>> searchTags) { - this.searchTags = searchTags; - } - public String getPrefix() { - return prefix; - } - public void setPrefix(String prefix) { - this.prefix = prefix; - } - public String[] getPartitions() { - return partitions; - } - public void setPartitions(String[] partitions) { - this.partitions = partitions; - } - public long getTimestamp() { - return timestamp; - } - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - public Map<String, String> getTags() { - return tags; - } - public void setTags(Map<String, String> tags) { - this.tags = tags; - } - public List<byte[]> getIndexRowkeys() { - return indexRowkeys; - } - public void setIndexRowkeys(List<byte[]> indexRowkeys) { - this.indexRowkeys = indexRowkeys; - } - public Map<String, Object> getExtraValues() { return extraValues; } - public void setExtraValues(Map<String, Object> extraValues) { this.extraValues = extraValues; } - - public String toString(){ - StringBuffer sb = new StringBuffer(); - sb.append(prefix); - sb.append("|"); - sb.append(DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timestamp)); - sb.append("("); - sb.append(timestamp); - sb.append(")"); - sb.append("|searchTags:"); - if(searchTags != null){ - for(String tagkey : searchTags.keySet()){ - sb.append(tagkey); - sb.append('='); - List<String> tagValues = searchTags.get(tagkey); - sb.append("("); - for(String tagValue : tagValues){ - sb.append(tagValue); - sb.append(","); - } - sb.append(")"); - sb.append(","); - } - } - sb.append("|tags:"); - if(tags != null){ - for(Map.Entry<String, String> entry : tags.entrySet()){ - sb.append(entry.getKey()); - sb.append("="); - sb.append(entry.getValue()); - sb.append(","); - } - } - sb.append("|columns:"); - if(qualifierValues != null){ - for(String qualifier : qualifierValues.keySet()){ - byte[] value = qualifierValues.get(qualifier); - sb.append(qualifier); - sb.append("="); - if(value != null){ - sb.append(new String(value)); - } - sb.append(","); - } - } - return sb.toString(); - } + private String encodedRowkey; + private String prefix; + private String[] partitions; + private long timestamp; + private Map<String, byte[]> qualifierValues; + + private Map<String, Object> extraValues; + private Map<String, String> tags; + private Map<String, List<String>> searchTags; + private List<byte[]> indexRowkeys; + + public String getEncodedRowkey() { + return encodedRowkey; + } + + public void setEncodedRowkey(String encodedRowkey) { + this.encodedRowkey = encodedRowkey; + } + + public Map<String, byte[]> getQualifierValues() { + return qualifierValues; + } + + public void setQualifierValues(Map<String, byte[]> qualifierValues) { + this.qualifierValues = qualifierValues; + } + + public Map<String, List<String>> getSearchTags() { + return searchTags; + } + + public void setSearchTags(Map<String, List<String>> searchTags) { + this.searchTags = searchTags; + } + + public String getPrefix() { + return prefix; + } + + public void setPrefix(String prefix) { + this.prefix = prefix; + } + + public String[] getPartitions() { + return partitions; + } + + public void setPartitions(String[] partitions) { + this.partitions = partitions; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public Map<String, String> getTags() { + return tags; + } + + public void setTags(Map<String, String> tags) { + this.tags = tags; + } + + public List<byte[]> getIndexRowkeys() { + return indexRowkeys; + } + + public void setIndexRowkeys(List<byte[]> indexRowkeys) { + this.indexRowkeys = indexRowkeys; + } + + public Map<String, Object> getExtraValues() { + return extraValues; + } + + public void setExtraValues(Map<String, Object> extraValues) { + this.extraValues = extraValues; + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append(prefix); + sb.append("|"); + sb.append(DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timestamp)); + sb.append("("); + sb.append(timestamp); + sb.append(")"); + sb.append("|searchTags:"); + if (searchTags != null) { + for (String tagkey : searchTags.keySet()) { + sb.append(tagkey); + sb.append('='); + List<String> tagValues = searchTags.get(tagkey); + sb.append("("); + for (String tagValue : tagValues) { + sb.append(tagValue); + sb.append(","); + } + sb.append(")"); + sb.append(","); + } + } + sb.append("|tags:"); + if (tags != null) { + for (Map.Entry<String, String> entry : tags.entrySet()) { + sb.append(entry.getKey()); + sb.append("="); + sb.append(entry.getValue()); + sb.append(","); + } + } + sb.append("|columns:"); + if (qualifierValues != null) { + for (String qualifier : qualifierValues.keySet()) { + byte[] value = qualifierValues.get(qualifier); + sb.append(qualifier); + sb.append("="); + if (value != null) { + sb.append(new String(value)); + } + sb.append(","); + } + } + return sb.toString(); + } }
