Repository: eagle Updated Branches: refs/heads/master 4e228a8de -> 8e76d34ea
[EAGLE-977] Duplicated entities are generated by TopologyHealthCheckApp https://issues.apache.org/jira/browse/EAGLE-977 * change FIXED_WRITE_TIMESTAMP into a constant value Author: Zhao, Qingwen <qingwz...@apache.org> Closes #893 from qingwen220/EAGLE-977. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/8e76d34e Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/8e76d34e Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/8e76d34e Branch: refs/heads/master Commit: 8e76d34ea30bafc6d02c818319ac547ef132a4dc Parents: 4e228a8 Author: Zhao, Qingwen <qingwz...@apache.org> Authored: Wed Mar 29 14:45:43 2017 +0800 Committer: Zhao, Qingwen <qingwz...@apache.org> Committed: Wed Mar 29 14:45:43 2017 +0800 ---------------------------------------------------------------------- .../entity/GenericEntityScanStreamReader.java | 184 ++++++++++--------- .../eagle/log/entity/meta/EntityConstants.java | 15 +- .../GenericCoprocessorAggregateQuery.java | 12 +- 3 files changed, 109 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/8e76d34e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityScanStreamReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityScanStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityScanStreamReader.java index e0a7119..a9e03b3 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityScanStreamReader.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityScanStreamReader.java @@ -28,102 +28,104 @@ import java.io.IOException; import java.util.Date; public class GenericEntityScanStreamReader extends StreamReader { - private static final Logger LOG = LoggerFactory.getLogger(GenericEntityScanStreamReader.class); - - private EntityDefinition entityDef; - private SearchCondition condition; - private String prefix; - private long lastTimestamp = 0; - private long firstTimestamp = 0; - - public GenericEntityScanStreamReader(String serviceName, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{ - this.prefix = prefix; - checkNotNull(serviceName, "serviceName"); - this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName); - checkNotNull(entityDef, "EntityDefinition"); - this.condition = condition; - } + private static final Logger LOG = LoggerFactory.getLogger(GenericEntityScanStreamReader.class); - public GenericEntityScanStreamReader(EntityDefinition entityDef, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{ - this.prefix = prefix; - checkNotNull(entityDef, "entityDef"); - this.entityDef = entityDef; - checkNotNull(entityDef, "EntityDefinition"); - this.condition = condition; - } - - public long getLastTimestamp() { - return lastTimestamp; - } - - private void checkNotNull(Object o, String message){ - if(o == null){ - throw new IllegalArgumentException(message + " should not be null"); - } - } + private EntityDefinition entityDef; + private SearchCondition condition; + private String prefix; + private long lastTimestamp = 0; + private long firstTimestamp = 0; - public EntityDefinition getEntityDefinition() { - return entityDef; - } + public GenericEntityScanStreamReader(String serviceName, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{ + this.prefix = prefix; + checkNotNull(serviceName, "serviceName"); + this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName); + checkNotNull(entityDef, "EntityDefinition"); + this.condition = condition; + } - public SearchCondition getSearchCondition() { - return condition; - } + public GenericEntityScanStreamReader(EntityDefinition entityDef, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{ + this.prefix = prefix; + checkNotNull(entityDef, "entityDef"); + this.entityDef = entityDef; + checkNotNull(entityDef, "EntityDefinition"); + this.condition = condition; + } - @Override - public void readAsStream() throws Exception{ - Date start = null; - Date end = null; - // shortcut to avoid read when pageSize=0 - if(condition.getPageSize() <= 0){ - return; // return nothing - } - // Process the time range if needed - if(entityDef.isTimeSeries()){ - start = new Date(condition.getStartTime()); - end = new Date(condition.getEndTime()); - }else{ - start = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_START_HUMANTIME); - end = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_END_HUMANTIME); - } - byte[][] outputQualifiers = null; - if(!condition.isOutputAll()) { - // Generate the output qualifiers - outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, condition.getOutputFields()); - } - HBaseLogReader2 reader = new HBaseLogReader2(entityDef, condition.getPartitionValues(), start, end, condition.getFilter(), condition.getStartRowkey(), outputQualifiers, this.prefix); - try{ - reader.open(); - InternalLog log; - int count = 0; - while ((log = reader.read()) != null) { - TaggedLogAPIEntity entity = HBaseInternalLogHelper.buildEntity(log, entityDef); - if (lastTimestamp < entity.getTimestamp()) { - lastTimestamp = entity.getTimestamp(); - } - if(firstTimestamp > entity.getTimestamp() || firstTimestamp == 0){ - firstTimestamp = entity.getTimestamp(); - } + public long getLastTimestamp() { + return lastTimestamp; + } - entity.setSerializeVerbose(condition.isOutputVerbose()); - entity.setSerializeAlias(condition.getOutputAlias()); + private void checkNotNull(Object o, String message){ + if(o == null){ + throw new IllegalArgumentException(message + " should not be null"); + } + } - for(EntityCreationListener l : _listeners){ - l.entityCreated(entity); - } - if(++count == condition.getPageSize()) - break; - } - }catch(IOException ioe){ - LOG.error("Fail reading log", ioe); - throw ioe; - }finally{ - reader.close(); - } - } + public EntityDefinition getEntityDefinition() { + return entityDef; + } - @Override - public long getFirstTimestamp() { - return this.firstTimestamp; - } + public SearchCondition getSearchCondition() { + return condition; + } + + @Override + public void readAsStream() throws Exception{ + Date start = null; + Date end = null; + // shortcut to avoid read when pageSize=0 + if(condition.getPageSize() <= 0){ + return; // return nothing + } + // Process the time range if needed + if(entityDef.isTimeSeries()){ + start = new Date(condition.getStartTime()); + end = new Date(condition.getEndTime()); + }else{ + //start = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_START_HUMANTIME); + //end = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_END_HUMANTIME); + start = new Date(EntityConstants.FIXED_READ_START_TIMESTAMP); + end = new Date(EntityConstants.FIXED_READ_END_TIMESTAMP); + } + byte[][] outputQualifiers = null; + if(!condition.isOutputAll()) { + // Generate the output qualifiers + outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, condition.getOutputFields()); + } + HBaseLogReader2 reader = new HBaseLogReader2(entityDef, condition.getPartitionValues(), start, end, condition.getFilter(), condition.getStartRowkey(), outputQualifiers, this.prefix); + try{ + reader.open(); + InternalLog log; + int count = 0; + while ((log = reader.read()) != null) { + TaggedLogAPIEntity entity = HBaseInternalLogHelper.buildEntity(log, entityDef); + if (lastTimestamp < entity.getTimestamp()) { + lastTimestamp = entity.getTimestamp(); + } + if(firstTimestamp > entity.getTimestamp() || firstTimestamp == 0){ + firstTimestamp = entity.getTimestamp(); + } + + entity.setSerializeVerbose(condition.isOutputVerbose()); + entity.setSerializeAlias(condition.getOutputAlias()); + + for(EntityCreationListener l : _listeners){ + l.entityCreated(entity); + } + if(++count == condition.getPageSize()) + break; + } + }catch(IOException ioe){ + LOG.error("Fail reading log", ioe); + throw ioe; + }finally{ + reader.close(); + } + } + + @Override + public long getFirstTimestamp() { + return this.firstTimestamp; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/8e76d34e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java index 930743e..eab998c 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java @@ -19,12 +19,13 @@ package org.apache.eagle.log.entity.meta; import org.apache.eagle.common.DateTimeUtil; public class EntityConstants { - - public static final String FIXED_WRITE_HUMANTIME = "1970-01-02 00:00:00"; - public static final String FIXED_READ_START_HUMANTIME = "1970-01-01 00:00:00"; - public static final String FIXED_READ_END_HUMANTIME = "1970-01-03 00:00:00"; - - public static final long FIXED_WRITE_TIMESTAMP = - DateTimeUtil.humanDateToSecondsWithoutException(FIXED_WRITE_HUMANTIME) * 1000; + + public static final String FIXED_WRITE_HUMANTIME = "1970-01-02 00:00:00"; + public static final String FIXED_READ_START_HUMANTIME = "1970-01-01 00:00:00"; + public static final String FIXED_READ_END_HUMANTIME = "1970-01-03 00:00:00"; + + public static final long FIXED_WRITE_TIMESTAMP = DateTimeUtil.ONEDAY; // 1970-01-02 00:00:00 UTC + public static final long FIXED_READ_START_TIMESTAMP = 0; // 1970-01-01 00:00:00 UTC + public static final long FIXED_READ_END_TIMESTAMP = DateTimeUtil.ONEDAY * 2; // 1970-01-03 00:00:00 UTC } http://git-wip-us.apache.org/repos/asf/eagle/blob/8e76d34e/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericCoprocessorAggregateQuery.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericCoprocessorAggregateQuery.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericCoprocessorAggregateQuery.java index b6242c8..568a27f 100644 --- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericCoprocessorAggregateQuery.java +++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericCoprocessorAggregateQuery.java @@ -165,8 +165,10 @@ public class GenericCoprocessorAggregateQuery implements GenericQuery { start = new Date(searchCondition.getStartTime()); end = new Date(searchCondition.getEndTime()); } else { - start = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_START_HUMANTIME); - end = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_END_HUMANTIME); + //start = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_START_HUMANTIME); + //end = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_END_HUMANTIME); + start = new Date(EntityConstants.FIXED_READ_START_TIMESTAMP); + end = new Date(EntityConstants.FIXED_READ_END_TIMESTAMP); } // Generate the output qualifiers final byte[][] outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, searchCondition.getOutputFields()); @@ -281,8 +283,10 @@ public class GenericCoprocessorAggregateQuery implements GenericQuery { this.start = new Date(searchCondition.getStartTime()); this.end = new Date(searchCondition.getEndTime()); } else { - start = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_START_HUMANTIME); - end = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_END_HUMANTIME); + //start = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_START_HUMANTIME); + //end = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_END_HUMANTIME); + start = new Date(EntityConstants.FIXED_READ_START_TIMESTAMP); + end = new Date(EntityConstants.FIXED_READ_END_TIMESTAMP); } this.pointsNum = (int) ((end.getTime() - 1 - start.getTime()) / this.query.aggregateCondition.getIntervalMS() + 1); this.aggFuncNum = this.query.aggFuncNum;