KYLIN-1293 keep kylin log clean
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/50b3f429 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/50b3f429 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/50b3f429 Branch: refs/heads/2.x-staging Commit: 50b3f429d557a748198824a79483c77252a0a483 Parents: c1e83b0 Author: honma <ho...@ebay.com> Authored: Fri Jan 8 11:01:55 2016 +0800 Committer: honma <ho...@ebay.com> Committed: Fri Jan 8 11:03:06 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/KylinConfig.java | 3 +- .../apache/kylin/common/KylinConfigBase.java | 28 +- .../kylin/common/persistence/ResourceStore.java | 2 +- .../apache/kylin/common/util/BytesSplitter.java | 42 - .../kylin/common/util/CompressionUtils.java | 4 +- .../common/util/MemoryBudgetController.java | 10 +- .../src/main/resources/kylin-log4j.properties | 2 +- .../java/org/apache/kylin/cube/CubeManager.java | 2 +- .../inmemcubing/AbstractInMemCubeBuilder.java | 2 +- .../cube/inmemcubing/InMemCubeBuilder.java | 10 +- .../org/apache/kylin/cube/util/CubingUtils.java | 1 - .../apache/kylin/dict/DictionaryManager.java | 16 +- .../apache/kylin/query/routing/QueryRouter.java | 5 +- .../apache/kylin/query/routing/RoutingRule.java | 7 +- .../kylin/rest/controller/UserController.java | 6 +- .../kylin/rest/service/BadQueryDetector.java | 9 +- .../apache/kylin/rest/service/BasicService.java | 24 +- .../apache/kylin/rest/service/CacheService.java | 14 +- .../apache/kylin/rest/service/QueryService.java | 2 +- .../resources/kylin-server-log4j.properties | 97 +-- .../kylin/storage/hbase/HBaseResourceStore.java | 762 +++++++++---------- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 8 +- 22 files changed, 508 insertions(+), 548 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index bc18989..81f5827 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -99,8 +99,7 @@ public class KylinConfig extends KylinConfigBase { throw new IllegalStateException("Metadata uri : " + metaUri + " is not a valid REST URI address"); } } catch (Exception e) { - logger.info(e.getLocalizedMessage()); - throw new IllegalStateException("Metadata uri : " + metaUri + " is not recognized"); + throw new IllegalStateException("Metadata uri : " + metaUri + " is not recognized", e); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index e7c373e..96c76f0 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -44,7 +44,7 @@ import com.google.common.collect.Sets; public class KylinConfigBase implements Serializable { private static final Logger logger = LoggerFactory.getLogger(KylinConfigBase.class); - + /* * DON'T DEFINE CONSTANTS FOR PROPERTY KEYS! * @@ -62,9 +62,9 @@ public class KylinConfigBase implements Serializable { } // ============================================================================ - + private volatile Properties properties = new Properties(); - + public String getOptional(String prop) { return getOptional(prop, null); } @@ -90,7 +90,7 @@ public class KylinConfigBase implements Serializable { } return r; } - + /** * Use with care, properties should be read-only. This is for testing mostly. */ @@ -98,7 +98,7 @@ public class KylinConfigBase implements Serializable { logger.info("Kylin Config was updated with " + key + " : " + value); properties.setProperty(key, value); } - + protected Properties getAllProperties() { return properties; } @@ -114,9 +114,9 @@ public class KylinConfigBase implements Serializable { } this.properties = newProperties; } - + // ============================================================================ - + public String getMetadataUrl() { return getOptional("kylin.metadata.url"); } @@ -356,15 +356,15 @@ public class KylinConfigBase implements Serializable { public long getJobStepTimeout() { return Long.parseLong(getOptional("kylin.job.step.timeout", String.valueOf(2 * 60 * 60))); } - + public String getCubeAlgorithm() { return getOptional("kylin.cube.algorithm", "auto"); } - + public double getCubeAlgorithmAutoThreshold() { return Double.parseDouble(getOptional("kylin.cube.algorithm.auto.threshold", "8")); } - + public int getDictionaryMaxCardinality() { return Integer.parseInt(getOptional("kylin.dictionary.max.cardinality", "5000000")); } @@ -404,6 +404,10 @@ public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000")); } + public int getBadQueryStackTraceDepth() { + return Integer.parseInt(getOptional("kylin.query.badquery.stacktrace.depth", "10")); + } + public boolean getQueryRunLocalCoprocessor() { return Boolean.parseBoolean(getOptional("kylin.query.run.local.coprocessor", "false")); } @@ -423,7 +427,7 @@ public class KylinConfigBase implements Serializable { public double getQueryCoprocessorMemGB() { return Double.parseDouble(this.getOptional("kylin.query.coprocessor.mem.gb", "3.0")); } - + public boolean isQuerySecureEnabled() { return Boolean.parseBoolean(this.getOptional("kylin.query.security.enabled", "true")); } @@ -466,7 +470,7 @@ public class KylinConfigBase implements Serializable { public String getHiveDatabaseForIntermediateTable() { return this.getOptional("kylin.job.hive.database.for.intermediatetable", "default"); } - + public boolean isGetJobStatusWithKerberos() { return Boolean.valueOf(this.getOptional("kylin.job.status.with.kerberos", "false")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java index b2a4ce3..ccae80b 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java @@ -227,7 +227,7 @@ abstract public class ResourceStore { */ final public <T extends RootPersistentEntity> long putResource(String resPath, T obj, Serializer<T> serializer) throws IOException { resPath = norm(resPath); - logger.debug("Saving resource " + resPath + " (Store " + kylinConfig.getMetadataUrl() + ")"); + //logger.debug("Saving resource " + resPath + " (Store " + kylinConfig.getMetadataUrl() + ")"); long oldTS = obj.getLastModified(); long newTS = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/core-common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java b/core-common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java index cf2704c..9f819ac 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java @@ -85,48 +85,6 @@ public class BytesSplitter { this.bufferSize = buffers.length; } - public byte inferByteRowDelimiter(byte[] bytes, int byteLen, int expectedSplits) throws IOException { - - if (expectedSplits > this.splitBuffers.length) - throw new IOException("expectSplits can not be greater than split buffer size"); - - boolean delimiterFound = false; - byte foundDelimiter = 0; - for (int i = 0; i < bytes.length; ++i) { - byte c = bytes[i]; - if (!Character.isLetterOrDigit((char) c)) { - try { - int actualSplits = this.split(bytes, byteLen, c); - if (actualSplits == expectedSplits) { - if (!delimiterFound) { - logger.info("Delimiter found, value is : " + c); - delimiterFound = true; - foundDelimiter = c; - } else if (c != foundDelimiter) { - throw new IOException("Duplicate delimiter found, found delimiter is : " + foundDelimiter + " new delimiter is " + c); - } - } - } catch (Exception e) { - logger.info("Unqualified delimiter pruned, value is " + c); - } - } - } - - if (delimiterFound) - return foundDelimiter; - else - throw new IOException("No delimiter found"); - } - - public int detectDelim(byte[] array, int arrayLen, int expectedParts) { - for (int i = 0; i < COMMON_DELIMS.length; i++) { - int nParts = split(array, arrayLen, (byte) COMMON_DELIMS[i]); - if (nParts == expectedParts) - return COMMON_DELIMS[i]; - } - throw new RuntimeException("Cannot detect delimeter from first line -- " + Bytes.toString(array, 0, arrayLen) + " -- expect " + expectedParts + " columns"); - } - @Override public String toString() { StringBuilder buf = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java index c9838e4..58c10ff 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java @@ -45,7 +45,7 @@ public class CompressionUtils { outputStream.close(); byte[] output = outputStream.toByteArray(); - logger.info("Original: " + data.length + " bytes. " + "Compressed: " + output.length + " bytes "); + logger.debug("Original: " + data.length + " bytes. " + "Compressed: " + output.length + " bytes "); return output; } @@ -62,7 +62,7 @@ public class CompressionUtils { outputStream.close(); byte[] output = outputStream.toByteArray(); - logger.info("Original: " + data.length + " bytes. " + "Decompressed: " + output.length + " bytes"); + logger.debug("Original: " + data.length + " bytes. " + "Decompressed: " + output.length + " bytes"); return output; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java index 60aa0ad..20c1c25 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java @@ -105,7 +105,7 @@ public class MemoryBudgetController { try { reserve(consumer, requestMB); if (debug && waitStart > 0) - logger.info(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request"); + logger.debug(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request"); return; } catch (NotEnoughBudgetException ex) { // retry @@ -173,7 +173,7 @@ public class MemoryBudgetController { if (debug) { if (getSystemAvailMB() < getRemainingBudgetMB()) { - logger.info("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong."); + logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong."); } } } @@ -224,7 +224,7 @@ public class MemoryBudgetController { booking.remove(entry.consumer); } if (debug) { - logger.info(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB"); + logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB"); } if (delta < 0) { @@ -285,7 +285,7 @@ public class MemoryBudgetController { int mb = MemoryBudgetController.getSystemAvailMB(); if (mb < lowAvail) { lowAvail = mb; - logger.info("Lower system avail " + lowAvail + " MB in markHigh()"); + logger.debug("Lower system avail " + lowAvail + " MB in markHigh()"); } } @@ -294,7 +294,7 @@ public class MemoryBudgetController { int mb = MemoryBudgetController.gcAndGetSystemAvailMB(); if (mb > highAvail) { highAvail = mb; - logger.info("Higher system avail " + highAvail + " MB in markLow()"); + logger.debug("Higher system avail " + highAvail + " MB in markLow()"); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/core-common/src/main/resources/kylin-log4j.properties ---------------------------------------------------------------------- diff --git a/core-common/src/main/resources/kylin-log4j.properties b/core-common/src/main/resources/kylin-log4j.properties index b8816bd..e7201b3 100644 --- a/core-common/src/main/resources/kylin-log4j.properties +++ b/core-common/src/main/resources/kylin-log4j.properties @@ -21,7 +21,7 @@ log4j.rootLogger=INFO,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=L4J [%d{yyyy-MM-dd HH:mm:ss,SSS}][%p][%c] - %m%n +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t %c{1}:%L]: %m%n #log4j.logger.org.apache.hadoop=ERROR log4j.logger.org.apache.kylin=DEBUG http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 2643833..7db3130 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -835,7 +835,7 @@ public class CubeManager implements IRealizationProvider { usedStorageLocation.put(cubeName.toUpperCase(), segment.getStorageLocationIdentifier()); } - logger.info("Reloaded new cube: " + cubeName + " with reference being" + cubeInstance + " having " + cubeInstance.getSegments().size() + " segments:" + StringUtils.join(Collections2.transform(cubeInstance.getSegments(), new Function<CubeSegment, String>() { + logger.debug("Reloaded new cube: " + cubeName + " with reference being" + cubeInstance + " having " + cubeInstance.getSegments().size() + " segments:" + StringUtils.join(Collections2.transform(cubeInstance.getSegments(), new Function<CubeSegment, String>() { @Nullable @Override public String apply(CubeSegment input) { http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java index 15ab926..335a769 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java @@ -89,7 +89,7 @@ abstract public class AbstractInMemCubeBuilder { output.write(cuboidId, record); } scanner.close(); - logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms"); + logger.debug("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms"); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index 39f2a34..7960139 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -303,11 +303,11 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { private void makeMemoryBudget() { baseResult.aggrCacheMB = Math.max(baseCuboidMemTracker.getEstimateMB(), 10); // 10 MB at minimal - logger.info("Base cuboid aggr cache is " + baseResult.aggrCacheMB + " MB"); + logger.debug("Base cuboid aggr cache is " + baseResult.aggrCacheMB + " MB"); int systemAvailMB = MemoryBudgetController.gcAndGetSystemAvailMB(); - logger.info("System avail " + systemAvailMB + " MB"); + logger.debug("System avail " + systemAvailMB + " MB"); int reserve = reserveMemoryMB; - logger.info("Reserve " + reserve + " MB for system basics"); + logger.debug("Reserve " + reserve + " MB for system basics"); int budget = systemAvailMB - reserve; if (budget < baseResult.aggrCacheMB) { @@ -316,7 +316,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { logger.warn("System avail memory (" + systemAvailMB + " MB) is less than base aggr cache (" + baseResult.aggrCacheMB + " MB) + minimal reservation (" + reserve + " MB), consider increase JVM heap -Xmx"); } - logger.info("Memory Budget is " + budget + " MB"); + logger.debug("Memory Budget is " + budget + " MB"); memBudget = new MemoryBudgetController(budget); } @@ -348,7 +348,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { logger.info("Cuboid " + baseCuboidId + " has " + count + " rows, build takes " + timeSpent + "ms"); int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB); - logger.info("Wild esitmate of base aggr cache is " + mbEstimateBaseAggrCache + " MB"); + logger.info("Wild estimate of base aggr cache is " + mbEstimateBaseAggrCache + " MB"); return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, 0); } http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java index f6b0e35..bac5caa 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java @@ -200,7 +200,6 @@ public class CubingUtils { cubeSegment.putDictResPath(tblColRef, realDict.getResourcePath()); realDictMap.put(tblColRef, (Dictionary<String>) realDict.getDictionaryObject()); } catch (IOException e) { - logger.error("error save dictionary for column:" + tblColRef, e); throw new RuntimeException("error save dictionary for column:" + tblColRef, e); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java index c718704..cb58721 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -200,11 +200,11 @@ public class DictionaryManager { signature.setLastModifiedTime(System.currentTimeMillis()); signature.setPath("merged_with_no_original_path"); -// String dupDict = checkDupByInfo(newDictInfo); -// if (dupDict != null) { -// logger.info("Identical dictionary input " + newDictInfo.getInput() + ", reuse existing dictionary at " + dupDict); -// return getDictionaryInfo(dupDict); -// } + // String dupDict = checkDupByInfo(newDictInfo); + // if (dupDict != null) { + // logger.info("Identical dictionary input " + newDictInfo.getInput() + ", reuse existing dictionary at " + dupDict); + // return getDictionaryInfo(dupDict); + // } //check for cases where merging dicts are actually same boolean identicalSourceDicts = true; @@ -357,11 +357,11 @@ public class DictionaryManager { DictionaryInfo load(String resourcePath, boolean loadDictObj) throws IOException { ResourceStore store = MetadataManager.getInstance(config).getStore(); - logger.debug("Going to load DictionaryInfo from " + resourcePath); + //logger.debug("Going to load DictionaryInfo from " + resourcePath); DictionaryInfo info = store.getResource(resourcePath, DictionaryInfo.class, loadDictObj ? DictionaryInfoSerializer.FULL_SERIALIZER : DictionaryInfoSerializer.INFO_SERIALIZER); - if (loadDictObj) - logger.debug("Loaded dictionary at " + resourcePath); + // if (loadDictObj) + // logger.debug("Loaded dictionary at " + resourcePath); return info; } http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java b/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java index c4d0fbe..8b25b6d 100644 --- a/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java +++ b/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java @@ -69,9 +69,8 @@ public class QueryRouter { Candidate chosen = candidates.get(0); adjustForDimensionAsMeasure(chosen, olapContext); - logger.info("The realizations remaining: "); - logger.info(RoutingRule.getPrintableText(candidates)); - logger.info("The realization being chosen: " + chosen.realization.getName()); + logger.info("The realizations remaining: " + RoutingRule.getPrintableText(candidates) + " And the final chosen one is the first one"); + return chosen.realization; } http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java index cb42412..3d15a51 100644 --- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java +++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java @@ -43,11 +43,10 @@ public abstract class RoutingRule { public static void applyRules(List<Candidate> candidates) { for (RoutingRule rule : rules) { - logger.info("Realizations order before: " + getPrintableText(candidates)); - logger.info("Applying rule : " + rule); + String before = getPrintableText(candidates); rule.apply(candidates); - logger.info("Realizations order after: " + getPrintableText(candidates)); - logger.info("==================================================="); + String after = getPrintableText(candidates); + logger.info("Applying rule: " + rule + ", realizations before: " + before + ", realizations after: " + after); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/server/src/main/java/org/apache/kylin/rest/controller/UserController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/UserController.java b/server/src/main/java/org/apache/kylin/rest/controller/UserController.java index 1a5d7c5..a7bc782 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/UserController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/UserController.java @@ -56,18 +56,18 @@ public class UserController { Authentication authentication = SecurityContextHolder.getContext().getAuthentication(); if (authentication == null) { - logger.info("authentication is null."); + logger.debug("authentication is null."); return null; } if (authentication.getPrincipal() instanceof UserDetails) { - logger.info("authentication.getPrincipal() is " + authentication.getPrincipal()); + logger.debug("authentication.getPrincipal() is " + authentication.getPrincipal()); return (UserDetails) authentication.getPrincipal(); } if (authentication.getDetails() instanceof UserDetails) { - logger.info("authentication.getDetails() is " + authentication.getDetails()); + logger.debug("authentication.getDetails() is " + authentication.getDetails()); return (UserDetails) authentication.getDetails(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java b/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java index 11062d9..71ab26b 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java +++ b/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.concurrent.ConcurrentMap; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.rest.request.SQLRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,7 +134,7 @@ public class BadQueryDetector extends Thread { break; // entries are sorted by startTime } } - + // report if low memory if (getSystemAvailMB() < alertMB) { logger.info("System free memory less than " + alertMB + " MB. " + entries.size() + " queries running."); @@ -142,10 +143,16 @@ public class BadQueryDetector extends Thread { // log the stack trace of bad query thread for further analysis private void dumpStackTrace(Thread t) { + int maxStackTraceDepth = KylinConfig.getInstanceFromEnv().getBadQueryStackTraceDepth(); + int current = 0; + StackTraceElement[] stackTrace = t.getStackTrace(); StringBuilder buf = new StringBuilder("Problematic thread 0x" + Long.toHexString(t.getId())); buf.append("\n"); for (StackTraceElement e : stackTrace) { + if (++current > maxStackTraceDepth) { + break; + } buf.append("\t").append("at ").append(e.toString()).append("\n"); } logger.info(buf.toString()); http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/server/src/main/java/org/apache/kylin/rest/service/BasicService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java index 20fff47..1c3e71a 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java @@ -58,44 +58,44 @@ public abstract class BasicService { return kylinConfig; } - - public final MetadataManager getMetadataManager() { + + public MetadataManager getMetadataManager() { return MetadataManager.getInstance(getConfig()); } - public final CubeManager getCubeManager() { + public CubeManager getCubeManager() { return CubeManager.getInstance(getConfig()); } - public final StreamingManager getStreamingManager() { + public StreamingManager getStreamingManager() { return StreamingManager.getInstance(getConfig()); } - public final KafkaConfigManager getKafkaManager() throws IOException { - return KafkaConfigManager.getInstance(getConfig()); + public KafkaConfigManager getKafkaManager() throws IOException { + return KafkaConfigManager.getInstance(getConfig()); } - public final CubeDescManager getCubeDescManager() { + public CubeDescManager getCubeDescManager() { return CubeDescManager.getInstance(getConfig()); } - public final ProjectManager getProjectManager() { + public ProjectManager getProjectManager() { return ProjectManager.getInstance(getConfig()); } - public final HybridManager getHybridManager() { + public HybridManager getHybridManager() { return HybridManager.getInstance(getConfig()); } - public final ExecutableManager getExecutableManager() { + public ExecutableManager getExecutableManager() { return ExecutableManager.getInstance(getConfig()); } - public final IIDescManager getIIDescManager() { + public IIDescManager getIIDescManager() { return IIDescManager.getInstance(getConfig()); } - public final IIManager getIIManager() { + public IIManager getIIManager() { return IIManager.getInstance(getConfig()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/server/src/main/java/org/apache/kylin/rest/service/CacheService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java index 89c525d..8227be6 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java @@ -20,7 +20,6 @@ package org.apache.kylin.rest.service; import java.io.File; import java.io.IOException; -import java.nio.charset.Charset; import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -32,6 +31,7 @@ import javax.sql.DataSource; import net.sf.ehcache.CacheManager; import org.apache.calcite.jdbc.Driver; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.restclient.Broadcaster; @@ -57,8 +57,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.datasource.DriverManagerDataSource; import org.springframework.stereotype.Component; -import com.google.common.io.Files; - /** */ @Component("cacheService") @@ -140,16 +138,12 @@ public class CacheService extends BasicService { DataSource ret = olapDataSources.get(project); if (ret == null) { - logger.debug("Creating a new data source"); - logger.debug("OLAP data source pointing to " + getConfig()); - + logger.debug("Creating a new data source, OLAP data source pointing to " + getConfig()); File modelJson = OLAPSchemaFactory.createTempOLAPJson(project, getConfig()); try { - List<String> text = Files.readLines(modelJson, Charset.defaultCharset()); - logger.debug("The new temp olap json is :"); - for (String line : text) - logger.debug(line); + String text = FileUtils.readFileToString(modelJson); + logger.debug("The new temp olap json is :" + text); } catch (IOException e) { e.printStackTrace(); // logging failure is not critical } http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/server/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java index d248b72..bf371be 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -266,7 +266,7 @@ public class QueryService extends BasicService { String correctedSql = QueryUtil.massageSql(sqlRequest); if (correctedSql.equals(sqlRequest.getSql()) == false) - logger.debug("The corrected query: " + correctedSql); + logger.info("The corrected query: " + correctedSql); // add extra parameters into olap context, like acceptPartial Map<String, String> parameters = new HashMap<String, String>(); http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/server/src/main/resources/kylin-server-log4j.properties ---------------------------------------------------------------------- diff --git a/server/src/main/resources/kylin-server-log4j.properties b/server/src/main/resources/kylin-server-log4j.properties index 5f78ebe..4ad7721 100644 --- a/server/src/main/resources/kylin-server-log4j.properties +++ b/server/src/main/resources/kylin-server-log4j.properties @@ -1,52 +1,53 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -#define appenders -log4j.appender.file=org.apache.log4j.DailyRollingFileAppender -log4j.appender.file.layout=org.apache.log4j.PatternLayout +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +#define appenders +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.File=${catalina.home}/../logs/kylin.log -log4j.appender.file.layout.ConversionPattern=[%t]:[%d{yyyy-MM-dd HH:mm:ss,SSS}][%p][%l] - %m%n -log4j.appender.file.Append=true - -log4j.appender.query=org.apache.log4j.DailyRollingFileAppender -log4j.appender.query.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n +log4j.appender.file.Append=true + +log4j.appender.query=org.apache.log4j.DailyRollingFileAppender +log4j.appender.query.layout=org.apache.log4j.PatternLayout log4j.appender.query.File=${catalina.home}/../logs/kylin_query.log -log4j.appender.query.layout.ConversionPattern=[%t]:[%d{yyyy-MM-dd HH:mm:ss,SSS}][%p][%l] - %m%n -log4j.appender.query.Append=true - -log4j.appender.job=org.apache.log4j.DailyRollingFileAppender -log4j.appender.job.layout=org.apache.log4j.PatternLayout +log4j.appender.query.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n +log4j.appender.query.Append=true + +log4j.appender.job=org.apache.log4j.DailyRollingFileAppender +log4j.appender.job.layout=org.apache.log4j.PatternLayout log4j.appender.job.File=${catalina.home}/../logs/kylin_job.log -log4j.appender.job.layout.ConversionPattern=[%t]:[%d{yyyy-MM-dd HH:mm:ss,SSS}][%p][%l] - %m%n -log4j.appender.job.Append=true - -#overall config -log4j.rootLogger=INFO,file -log4j.logger.org.apache.kylin=DEBUG -log4j.logger.org.springframework=WARN +log4j.appender.job.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n +log4j.appender.job.Append=true + +#overall config +log4j.rootLogger=INFO,file +log4j.logger.org.apache.kylin=DEBUG +log4j.logger.org.springframework=WARN log4j.logger.org.springframework.security=INFO - -#query config -log4j.logger.org.apache.kylin.rest.controller.QueryController=DEBUG, query -log4j.logger.org.apache.kylin.rest.service.QueryService=DEBUG, query -log4j.logger.org.apache.kylin.query=DEBUG, query -#log4j.logger.org.apache.kylin.storage=DEBUG, query //too many stuff in storage package now - -#job config -log4j.logger.org.apache.kylin.rest.controller.JobController=DEBUG, job -log4j.logger.org.apache.kylin.rest.service.JobService=DEBUG, job -log4j.logger.org.apache.kylin.job=DEBUG, job + +#query config +log4j.logger.org.apache.kylin.rest.controller.QueryController=DEBUG, query +log4j.logger.org.apache.kylin.rest.service.QueryService=DEBUG, query +log4j.logger.org.apache.kylin.query=DEBUG, query +#log4j.logger.org.apache.kylin.storage=DEBUG, query //too many stuff in storage package now + +#job config +log4j.logger.org.apache.kylin.rest.controller.JobController=DEBUG, job +log4j.logger.org.apache.kylin.rest.service.JobService=DEBUG, job +log4j.logger.org.apache.kylin.job=DEBUG, job http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index 50ea757..c2e2e64 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -1,393 +1,393 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.Delete; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.KeyOnlyFilter; -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.RawResource; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.BytesUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -public class HBaseResourceStore extends ResourceStore { - - private static final Logger logger = LoggerFactory.getLogger(HBaseResourceStore.class); - - private static final String DEFAULT_TABLE_NAME = "kylin_metadata"; - private static final String FAMILY = "f"; - private static final byte[] B_FAMILY = Bytes.toBytes(FAMILY); - private static final String COLUMN = "c"; - private static final byte[] B_COLUMN = Bytes.toBytes(COLUMN); - private static final String COLUMN_TS = "t"; - private static final byte[] B_COLUMN_TS = Bytes.toBytes(COLUMN_TS); - - private static final Map<String, String> TABLE_SUFFIX_MAP = new LinkedHashMap<String, String>(); - - static { - TABLE_SUFFIX_MAP.put(CUBE_RESOURCE_ROOT + "/", "_cube"); - TABLE_SUFFIX_MAP.put(DICT_RESOURCE_ROOT + "/", "_dict"); - TABLE_SUFFIX_MAP.put("/invertedindex/", "_invertedindex"); - TABLE_SUFFIX_MAP.put(PROJECT_RESOURCE_ROOT + "/", "_proj"); - TABLE_SUFFIX_MAP.put(SNAPSHOT_RESOURCE_ROOT + "/", "_table_snapshot"); - TABLE_SUFFIX_MAP.put("", ""); // DEFAULT CASE - } - - final String tableNameBase; - final String hbaseUrl; - - // final Map<String, String> tableNameMap; // path prefix ==> HBase table name - - private HConnection getConnection() throws IOException { - return HBaseConnection.get(hbaseUrl); - } - - public HBaseResourceStore(KylinConfig kylinConfig) throws IOException { - super(kylinConfig); - - String metadataUrl = kylinConfig.getMetadataUrl(); - // split TABLE@HBASE_URL - int cut = metadataUrl.indexOf('@'); - tableNameBase = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut); - hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1); - - createHTableIfNeeded(getAllInOneTableName()); - - // tableNameMap = new LinkedHashMap<String, String>(); - // for (Entry<String, String> entry : TABLE_SUFFIX_MAP.entrySet()) { - // String pathPrefix = entry.getKey(); - // String tableName = tableNameBase + entry.getValue(); - // tableNameMap.put(pathPrefix, tableName); - // createHTableIfNeeded(tableName); - // } - - } - - private void createHTableIfNeeded(String tableName) throws IOException { - HBaseConnection.createHTableIfNeeded(getConnection(), tableName, FAMILY); - } - - private String getAllInOneTableName() { - return tableNameBase; - } - - @Override - protected ArrayList<String> listResourcesImpl(String resPath) throws IOException { - assert resPath.startsWith("/"); - String lookForPrefix = resPath.endsWith("/") ? resPath : resPath + "/"; - byte[] startRow = Bytes.toBytes(lookForPrefix); - byte[] endRow = Bytes.toBytes(lookForPrefix); - endRow[endRow.length - 1]++; - - ArrayList<String> result = new ArrayList<String>(); - - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - Scan scan = new Scan(startRow, endRow); - scan.setFilter(new KeyOnlyFilter()); - try { - ResultScanner scanner = table.getScanner(scan); - for (Result r : scanner) { - String path = Bytes.toString(r.getRow()); - assert path.startsWith(lookForPrefix); - int cut = path.indexOf('/', lookForPrefix.length()); - String child = cut < 0 ? path : path.substring(0, cut); - if (result.contains(child) == false) - result.add(child); - } - } finally { - IOUtils.closeQuietly(table); - } - // return null to indicate not a folder - return result.isEmpty() ? null : result; - } - - @Override - protected boolean existsImpl(String resPath) throws IOException { +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.KeyOnlyFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.RawResource; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class HBaseResourceStore extends ResourceStore { + + private static final Logger logger = LoggerFactory.getLogger(HBaseResourceStore.class); + + private static final String DEFAULT_TABLE_NAME = "kylin_metadata"; + private static final String FAMILY = "f"; + private static final byte[] B_FAMILY = Bytes.toBytes(FAMILY); + private static final String COLUMN = "c"; + private static final byte[] B_COLUMN = Bytes.toBytes(COLUMN); + private static final String COLUMN_TS = "t"; + private static final byte[] B_COLUMN_TS = Bytes.toBytes(COLUMN_TS); + + private static final Map<String, String> TABLE_SUFFIX_MAP = new LinkedHashMap<String, String>(); + + static { + TABLE_SUFFIX_MAP.put(CUBE_RESOURCE_ROOT + "/", "_cube"); + TABLE_SUFFIX_MAP.put(DICT_RESOURCE_ROOT + "/", "_dict"); + TABLE_SUFFIX_MAP.put("/invertedindex/", "_invertedindex"); + TABLE_SUFFIX_MAP.put(PROJECT_RESOURCE_ROOT + "/", "_proj"); + TABLE_SUFFIX_MAP.put(SNAPSHOT_RESOURCE_ROOT + "/", "_table_snapshot"); + TABLE_SUFFIX_MAP.put("", ""); // DEFAULT CASE + } + + final String tableNameBase; + final String hbaseUrl; + + // final Map<String, String> tableNameMap; // path prefix ==> HBase table name + + private HConnection getConnection() throws IOException { + return HBaseConnection.get(hbaseUrl); + } + + public HBaseResourceStore(KylinConfig kylinConfig) throws IOException { + super(kylinConfig); + + String metadataUrl = kylinConfig.getMetadataUrl(); + // split TABLE@HBASE_URL + int cut = metadataUrl.indexOf('@'); + tableNameBase = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut); + hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1); + + createHTableIfNeeded(getAllInOneTableName()); + + // tableNameMap = new LinkedHashMap<String, String>(); + // for (Entry<String, String> entry : TABLE_SUFFIX_MAP.entrySet()) { + // String pathPrefix = entry.getKey(); + // String tableName = tableNameBase + entry.getValue(); + // tableNameMap.put(pathPrefix, tableName); + // createHTableIfNeeded(tableName); + // } + + } + + private void createHTableIfNeeded(String tableName) throws IOException { + HBaseConnection.createHTableIfNeeded(getConnection(), tableName, FAMILY); + } + + private String getAllInOneTableName() { + return tableNameBase; + } + + @Override + protected ArrayList<String> listResourcesImpl(String resPath) throws IOException { + assert resPath.startsWith("/"); + String lookForPrefix = resPath.endsWith("/") ? resPath : resPath + "/"; + byte[] startRow = Bytes.toBytes(lookForPrefix); + byte[] endRow = Bytes.toBytes(lookForPrefix); + endRow[endRow.length - 1]++; + + ArrayList<String> result = new ArrayList<String>(); + + HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Scan scan = new Scan(startRow, endRow); + scan.setFilter(new KeyOnlyFilter()); + try { + ResultScanner scanner = table.getScanner(scan); + for (Result r : scanner) { + String path = Bytes.toString(r.getRow()); + assert path.startsWith(lookForPrefix); + int cut = path.indexOf('/', lookForPrefix.length()); + String child = cut < 0 ? path : path.substring(0, cut); + if (result.contains(child) == false) + result.add(child); + } + } finally { + IOUtils.closeQuietly(table); + } + // return null to indicate not a folder + return result.isEmpty() ? null : result; + } + + @Override + protected boolean existsImpl(String resPath) throws IOException { Result r = getFromHTable(resPath, false, false); - return r != null; - } - - @Override - protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException { - byte[] startRow = Bytes.toBytes(rangeStart); - byte[] endRow = plusZero(Bytes.toBytes(rangeEnd)); - - Scan scan = new Scan(startRow, endRow); - scan.addColumn(B_FAMILY, B_COLUMN_TS); - scan.addColumn(B_FAMILY, B_COLUMN); - tuneScanParameters(scan); - - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - List<RawResource> result = Lists.newArrayList(); - try { - ResultScanner scanner = table.getScanner(scan); - for (Result r : scanner) { - result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r))); - } - } catch (IOException e) { - for (RawResource rawResource : result) { - IOUtils.closeQuietly(rawResource.inputStream); - } - throw e; - } finally { - IOUtils.closeQuietly(table); - } - return result; - } - - @Override - protected List<RawResource> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException { - byte[] startRow = Bytes.toBytes(rangeStart); - byte[] endRow = plusZero(Bytes.toBytes(rangeEnd)); - - Scan scan = new Scan(startRow, endRow); - scan.addColumn(B_FAMILY, B_COLUMN_TS); - scan.addColumn(B_FAMILY, B_COLUMN); - scan.setFilter(generateTimeFilterList(timeStartInMillis, timeEndInMillis)); - tuneScanParameters(scan); - - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - List<RawResource> result = Lists.newArrayList(); - try { - ResultScanner scanner = table.getScanner(scan); - for (Result r : scanner) { - result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r))); - } - } catch (IOException e) { - for (RawResource rawResource : result) { - IOUtils.closeQuietly(rawResource.inputStream); - } - throw e; - } finally { - IOUtils.closeQuietly(table); - } - return result; - } - - private void tuneScanParameters(Scan scan) { - // divide by 10 as some resource like dictionary or snapshot can be very large - scan.setCaching(kylinConfig.getHBaseScanCacheRows() / 10); - scan.setMaxResultSize(kylinConfig.getHBaseScanMaxResultSize()); - scan.setCacheBlocks(true); - } - - private FilterList generateTimeFilterList(long timeStartInMillis, long timeEndInMillis) { - FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); - SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER, Bytes.toBytes(timeStartInMillis)); - filterList.addFilter(timeStartFilter); - SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.LESS_OR_EQUAL, Bytes.toBytes(timeEndInMillis)); - filterList.addFilter(timeEndFilter); - return filterList; - } - - private InputStream getInputStream(String resPath, Result r) throws IOException { - if (r == null) { - return null; - } - byte[] value = r.getValue(B_FAMILY, B_COLUMN); - if (value.length == 0) { - Path redirectPath = bigCellHDFSPath(resPath); - Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); - FileSystem fileSystem = FileSystem.get(hconf); - - return fileSystem.open(redirectPath); - } else { - return new ByteArrayInputStream(value); - } - } - - private long getTimestamp(Result r) { - if (r == null || r.getValue(B_FAMILY, B_COLUMN_TS) == null) { - return 0; - } else { - return Bytes.toLong(r.getValue(B_FAMILY, B_COLUMN_TS)); - } - } - - @Override - protected RawResource getResourceImpl(String resPath) throws IOException { + return r != null; + } + + @Override + protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException { + byte[] startRow = Bytes.toBytes(rangeStart); + byte[] endRow = plusZero(Bytes.toBytes(rangeEnd)); + + Scan scan = new Scan(startRow, endRow); + scan.addColumn(B_FAMILY, B_COLUMN_TS); + scan.addColumn(B_FAMILY, B_COLUMN); + tuneScanParameters(scan); + + HTableInterface table = getConnection().getTable(getAllInOneTableName()); + List<RawResource> result = Lists.newArrayList(); + try { + ResultScanner scanner = table.getScanner(scan); + for (Result r : scanner) { + result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r))); + } + } catch (IOException e) { + for (RawResource rawResource : result) { + IOUtils.closeQuietly(rawResource.inputStream); + } + throw e; + } finally { + IOUtils.closeQuietly(table); + } + return result; + } + + @Override + protected List<RawResource> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException { + byte[] startRow = Bytes.toBytes(rangeStart); + byte[] endRow = plusZero(Bytes.toBytes(rangeEnd)); + + Scan scan = new Scan(startRow, endRow); + scan.addColumn(B_FAMILY, B_COLUMN_TS); + scan.addColumn(B_FAMILY, B_COLUMN); + scan.setFilter(generateTimeFilterList(timeStartInMillis, timeEndInMillis)); + tuneScanParameters(scan); + + HTableInterface table = getConnection().getTable(getAllInOneTableName()); + List<RawResource> result = Lists.newArrayList(); + try { + ResultScanner scanner = table.getScanner(scan); + for (Result r : scanner) { + result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r))); + } + } catch (IOException e) { + for (RawResource rawResource : result) { + IOUtils.closeQuietly(rawResource.inputStream); + } + throw e; + } finally { + IOUtils.closeQuietly(table); + } + return result; + } + + private void tuneScanParameters(Scan scan) { + // divide by 10 as some resource like dictionary or snapshot can be very large + scan.setCaching(kylinConfig.getHBaseScanCacheRows() / 10); + scan.setMaxResultSize(kylinConfig.getHBaseScanMaxResultSize()); + scan.setCacheBlocks(true); + } + + private FilterList generateTimeFilterList(long timeStartInMillis, long timeEndInMillis) { + FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); + SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER, Bytes.toBytes(timeStartInMillis)); + filterList.addFilter(timeStartFilter); + SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.LESS_OR_EQUAL, Bytes.toBytes(timeEndInMillis)); + filterList.addFilter(timeEndFilter); + return filterList; + } + + private InputStream getInputStream(String resPath, Result r) throws IOException { + if (r == null) { + return null; + } + byte[] value = r.getValue(B_FAMILY, B_COLUMN); + if (value.length == 0) { + Path redirectPath = bigCellHDFSPath(resPath); + Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); + FileSystem fileSystem = FileSystem.get(hconf); + + return fileSystem.open(redirectPath); + } else { + return new ByteArrayInputStream(value); + } + } + + private long getTimestamp(Result r) { + if (r == null || r.getValue(B_FAMILY, B_COLUMN_TS) == null) { + return 0; + } else { + return Bytes.toLong(r.getValue(B_FAMILY, B_COLUMN_TS)); + } + } + + @Override + protected RawResource getResourceImpl(String resPath) throws IOException { Result r = getFromHTable(resPath, true, true); - if (r == null) - return null; - else - return new RawResource(getInputStream(resPath, r), getTimestamp(r)); - } - - @Override - protected long getResourceTimestampImpl(String resPath) throws IOException { + if (r == null) + return null; + else + return new RawResource(getInputStream(resPath, r), getTimestamp(r)); + } + + @Override + protected long getResourceTimestampImpl(String resPath) throws IOException { return getTimestamp(getFromHTable(resPath, false, true)); - } - - @Override - protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException { - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - IOUtils.copy(content, bout); - bout.close(); - - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - try { - byte[] row = Bytes.toBytes(resPath); - Put put = buildPut(resPath, ts, row, bout.toByteArray(), table); - - table.put(put); - table.flushCommits(); - } finally { - IOUtils.closeQuietly(table); - } - } - - @Override - protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException { - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - try { - byte[] row = Bytes.toBytes(resPath); - byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS); - Put put = buildPut(resPath, newTS, row, content, table); - - boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put); - logger.info("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS + ", operation result: " + ok); - if (!ok) { - long real = getResourceTimestampImpl(resPath); - throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real); - } - - table.flushCommits(); - - return newTS; - } finally { - IOUtils.closeQuietly(table); - } - } - - @Override - protected void deleteResourceImpl(String resPath) throws IOException { - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - try { - Delete del = new Delete(Bytes.toBytes(resPath)); - table.delete(del); - table.flushCommits(); - } finally { - IOUtils.closeQuietly(table); - } - } - - @Override - protected String getReadableResourcePathImpl(String resPath) { - return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl(); - } - + } + + @Override + protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException { + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + IOUtils.copy(content, bout); + bout.close(); + + HTableInterface table = getConnection().getTable(getAllInOneTableName()); + try { + byte[] row = Bytes.toBytes(resPath); + Put put = buildPut(resPath, ts, row, bout.toByteArray(), table); + + table.put(put); + table.flushCommits(); + } finally { + IOUtils.closeQuietly(table); + } + } + + @Override + protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException { + HTableInterface table = getConnection().getTable(getAllInOneTableName()); + try { + byte[] row = Bytes.toBytes(resPath); + byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS); + Put put = buildPut(resPath, newTS, row, content, table); + + boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put); + logger.debug("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS + ", operation result: " + ok); + if (!ok) { + long real = getResourceTimestampImpl(resPath); + throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real); + } + + table.flushCommits(); + + return newTS; + } finally { + IOUtils.closeQuietly(table); + } + } + + @Override + protected void deleteResourceImpl(String resPath) throws IOException { + HTableInterface table = getConnection().getTable(getAllInOneTableName()); + try { + Delete del = new Delete(Bytes.toBytes(resPath)); + table.delete(del); + table.flushCommits(); + } finally { + IOUtils.closeQuietly(table); + } + } + + @Override + protected String getReadableResourcePathImpl(String resPath) { + return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl(); + } + private Result getFromHTable(String path, boolean fetchContent, boolean fetchTimestamp) throws IOException { byte[] rowkey = Bytes.toBytes(path); - + Get get = new Get(rowkey); - if (!fetchContent && !fetchTimestamp) { + if (!fetchContent && !fetchTimestamp) { get.setCheckExistenceOnly(true); - } else { - if (fetchContent) + } else { + if (fetchContent) get.addColumn(B_FAMILY, B_COLUMN); - if (fetchTimestamp) + if (fetchTimestamp) get.addColumn(B_FAMILY, B_COLUMN_TS); - } - - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - try { - Result result = table.get(get); + } + + HTableInterface table = getConnection().getTable(getAllInOneTableName()); + try { + Result result = table.get(get); boolean exists = result != null && (!result.isEmpty() || (result.getExists() != null && result.getExists())); - return exists ? result : null; - } finally { - IOUtils.closeQuietly(table); - } - } - - private byte[] plusZero(byte[] startRow) { - byte[] endRow = Arrays.copyOf(startRow, startRow.length + 1); - endRow[endRow.length - 1] = 0; - return endRow; - } - - private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException { - Path redirectPath = bigCellHDFSPath(resPath); - Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); - FileSystem fileSystem = FileSystem.get(hconf); - - if (fileSystem.exists(redirectPath)) { - fileSystem.delete(redirectPath, true); - } - - FSDataOutputStream out = fileSystem.create(redirectPath); - - try { - out.write(largeColumn); - } finally { - IOUtils.closeQuietly(out); - } - - return redirectPath; - } - - public Path bigCellHDFSPath(String resPath) { - String hdfsWorkingDirectory = this.kylinConfig.getHdfsWorkingDirectory(); - Path redirectPath = new Path(hdfsWorkingDirectory, "resources" + resPath); - return redirectPath; - } - - private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException { - int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize(); - if (content.length > kvSizeLimit) { - writeLargeCellToHdfs(resPath, content, table); - content = BytesUtil.EMPTY_BYTE_ARRAY; - } - - Put put = new Put(row); - put.add(B_FAMILY, B_COLUMN, content); - put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts)); - - return put; - } -} + return exists ? result : null; + } finally { + IOUtils.closeQuietly(table); + } + } + + private byte[] plusZero(byte[] startRow) { + byte[] endRow = Arrays.copyOf(startRow, startRow.length + 1); + endRow[endRow.length - 1] = 0; + return endRow; + } + + private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException { + Path redirectPath = bigCellHDFSPath(resPath); + Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); + FileSystem fileSystem = FileSystem.get(hconf); + + if (fileSystem.exists(redirectPath)) { + fileSystem.delete(redirectPath, true); + } + + FSDataOutputStream out = fileSystem.create(redirectPath); + + try { + out.write(largeColumn); + } finally { + IOUtils.closeQuietly(out); + } + + return redirectPath; + } + + public Path bigCellHDFSPath(String resPath) { + String hdfsWorkingDirectory = this.kylinConfig.getHdfsWorkingDirectory(); + Path redirectPath = new Path(hdfsWorkingDirectory, "resources" + resPath); + return redirectPath; + } + + private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException { + int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize(); + if (content.length > kvSizeLimit) { + writeLargeCellToHdfs(resPath, content, table); + content = BytesUtil.EMPTY_BYTE_ARRAY; + } + + Put put = new Put(row); + put.add(B_FAMILY, B_COLUMN, content); + put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts)); + + return put; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/50b3f429/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 35df3ed..6ff07a4 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -157,18 +157,18 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { GTScanRequest.serializer.serialize(scanRequest, buffer); buffer.flip(); final ByteString scanRequestBytesString = HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit()); - logger.info("Serialized scanRequestBytes's size is " + (buffer.limit() - buffer.position())); + logger.debug("Serialized scanRequestBytes's size is " + (buffer.limit() - buffer.position())); final List<byte[]> rowBlocks = Collections.synchronizedList(Lists.<byte[]> newArrayList()); - logger.info("Total RawScan range count: " + rawScans.size()); + logger.debug("Total RawScan range count: " + rawScans.size()); for (RawScan rawScan : rawScans) { logScan(rawScan, cubeSeg.getStorageLocationIdentifier()); } final AtomicInteger totalScannedCount = new AtomicInteger(0); final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior(); - logger.info("The execution of this query will use " + toggle + " as endpoint's behavior"); + logger.debug("The execution of this query will use " + toggle + " as endpoint's behavior"); List<Future<?>> futures = Lists.newArrayList(); for (int i = 0; i < rawScans.size(); ++i) { @@ -200,7 +200,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { //results.size() supposed to be 1; if (results.size() != 1) { - logger.warn("{} CubeVisitResponse returned for shard {}", results.size(), shardIndex); + logger.info("{} CubeVisitResponse returned for shard {}", results.size(), shardIndex); } for (CubeVisitProtos.CubeVisitResponse result : results) {