http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 50c4639..8812dad 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -82,6 +82,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn private JobEngineConfig jobEngineConfig; private String serverName; + private final static String SEGMENT_ID = "segmentId"; public static final String ZOOKEEPER_LOCK_PATH = "/job_engine/lock"; // note ZookeeperDistributedLock will ensure zk path prefix: /kylin/metadata @@ -143,8 +144,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn logger.warn(executable.toString() + " fail to schedule in server: " + serverName, ex); } } - logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, " - + nOtherRunning + " running in other server, " + nReady + " ready, " + nOthers + " others"); + logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, " + nOtherRunning + " running in other server, " + nReady + " ready, " + nOthers + " others"); } catch (Exception e) { logger.warn("Job Fetcher caught a exception " + e); } @@ -216,12 +216,9 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn final Output output = executableManager.getOutput(id); if (output.getState() == ExecutableState.RUNNING) { AbstractExecutable executable = executableManager.getJob(id); - if (executable instanceof DefaultChainedExecutable - && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) - && !nodeData.equalsIgnoreCase(serverName)) { + if (executable instanceof DefaultChainedExecutable && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && !nodeData.equalsIgnoreCase(serverName)) { try { - logger.warn(nodeData + " has released the lock for: " + segmentId - + " but the job still running. so " + serverName + " resume the job"); + logger.warn(nodeData + " has released the lock for: " + segmentId + " but the job still running. so " + serverName + " resume the job"); if (!jobLock.isLocked(getLockPath(segmentId))) { executableManager.resumeRunningJobForce(executable.getId()); fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); @@ -280,13 +277,11 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn lockWatch = this.jobLock.watchLocks(getWatchPath(), watchPool, watcherProcess); int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit(); - jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, - new SynchronousQueue<Runnable>()); + jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>()); context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig()); fetcher = new FetcherRunner(); - fetcherPool.scheduleAtFixedRate(fetcher, 10, ExecutableConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS, - TimeUnit.SECONDS); + fetcherPool.scheduleAtFixedRate(fetcher, 10, ExecutableConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS); hasStarted = true; resumeAllRunningJobs(); @@ -324,7 +319,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn } return path; } - + @Override public void shutdown() throws SchedulerException { logger.info("Will shut down Job Engine ....");
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java index 5ed4b7e..1b6b29e 100644 --- a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java +++ b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java @@ -23,7 +23,7 @@ package org.apache.kylin.job.lock; * This interface is for such negotiation. */ public interface JobLock { - + boolean lockJobEngine(); void unlockJobEngine(); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java index 76a5300..faea9a4 100644 --- a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java @@ -119,8 +119,7 @@ public class ExecutableManagerTest extends LocalFileMetadataTestCase { assertEquals(one.getStatus(), another.getStatus()); assertEquals(one.isRunnable(), another.isRunnable()); assertEquals(one.getOutput(), another.getOutput()); - assertTrue((one.getParams() == null && another.getParams() == null) - || (one.getParams() != null && another.getParams() != null)); + assertTrue((one.getParams() == null && another.getParams() == null) || (one.getParams() != null && another.getParams() != null)); if (one.getParams() != null) { assertEquals(one.getParams().size(), another.getParams().size()); for (String key : one.getParams().keySet()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-job/src/test/java/org/apache/kylin/job/JobEngineConfigTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/JobEngineConfigTest.java b/core-job/src/test/java/org/apache/kylin/job/JobEngineConfigTest.java index fe77d09..77914ef 100644 --- a/core-job/src/test/java/org/apache/kylin/job/JobEngineConfigTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/JobEngineConfigTest.java @@ -18,15 +18,15 @@ package org.apache.kylin.job; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; - import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HotLoadKylinPropertiesTestCase; import org.apache.kylin.job.engine.JobEngineConfig; import org.junit.Test; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + /** * @author kangkaisen */ http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java index f5abd30..9a3eb48 100644 --- a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java +++ b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java @@ -38,11 +38,11 @@ public class SelfStopExecutable extends BaseTestExecutable { try { for (int i = 0; i < 20; i++) { sleepOneSecond(); - + if (isDiscarded()) return new ExecuteResult(ExecuteResult.State.STOPPED, "stopped"); } - + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); } finally { doingWork = false; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java index c514dfd..1ada9a1 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java @@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory; public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { private static final Logger logger = LoggerFactory.getLogger(BaseSchedulerTest.class); - + private DefaultScheduler scheduler; protected ExecutableManager jobService; @@ -76,7 +76,7 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { protected void waitForJobFinish(String jobId) { int error = 0; final int errorLimit = 3; - + while (error < errorLimit) { try { Thread.sleep(2000); @@ -87,8 +87,7 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { try { AbstractExecutable job = jobService.getJob(jobId); ExecutableState status = job.getStatus(); - if (status == ExecutableState.SUCCEED || status == ExecutableState.ERROR - || status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED) { + if (status == ExecutableState.SUCCEED || status == ExecutableState.ERROR || status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED) { break; } } catch (Exception ex) { @@ -96,7 +95,7 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { error++; } } - + if (error >= errorLimit) { throw new RuntimeException("waitForJobFinish() encounters exceptions, see logs above"); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java index 1b2b57d..fbdb0bb 100644 --- a/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java @@ -34,14 +34,13 @@ import com.google.common.collect.Maps; /** * Encoding Boolean values to bytes */ -public class BooleanDimEnc extends DimensionEncoding implements Serializable { +public class BooleanDimEnc extends DimensionEncoding implements Serializable{ private static final long serialVersionUID = 1L; public static final String ENCODING_NAME = "boolean"; //NOTE: when add new value, append to the array tail, DO NOT insert! - public static String[] ALLOWED_VALUES = new String[] { "", "true", "false", "TRUE", "FALSE", "True", "False", "t", - "f", "T", "F", "yes", "no", "YES", "NO", "Yes", "No", "y", "n", "Y", "N", "1", "0" }; + public static String[] ALLOWED_VALUES = new String[] { "", "true", "false", "TRUE", "FALSE", "True", "False", "t", "f", "T", "F", "yes", "no", "YES", "NO", "Yes", "No", "y", "n", "Y", "N", "1", "0" }; public static final Map<String, Integer> map = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/dimension/DateDimEnc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/DateDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/DateDimEnc.java index 50c08e6..6f06841 100644 --- a/core-metadata/src/main/java/org/apache/kylin/dimension/DateDimEnc.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/DateDimEnc.java @@ -96,8 +96,7 @@ public class DateDimEnc extends AbstractDateDimEnc implements Serializable { return millis; } - public static String[] replaceEncodingArgs(String encoding, String[] encodingArgs, String encodingName, - DataType type) { + public static String[] replaceEncodingArgs(String encoding, String[] encodingArgs, String encodingName, DataType type) { // https://issues.apache.org/jira/browse/KYLIN-2495 if (DateDimEnc.ENCODING_NAME.equals(encodingName)) { if (type.isIntegerFamily()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/dimension/DictionaryDimEnc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/DictionaryDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/DictionaryDimEnc.java index c846560..dcc8d47 100644 --- a/core-metadata/src/main/java/org/apache/kylin/dimension/DictionaryDimEnc.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/DictionaryDimEnc.java @@ -100,8 +100,7 @@ public class DictionaryDimEnc extends DimensionEncoding implements Serializable for (int i = outputOffset; i < outputOffset + fixedLen; i++) { output[i] = defaultByte; } - logger.error("Can't translate value " + valueStr + " to dictionary ID, roundingFlag " + roundingFlag - + ". Using default value " + String.format("\\x%02X", defaultByte)); + logger.error("Can't translate value " + valueStr + " to dictionary ID, roundingFlag " + roundingFlag + ". Using default value " + String.format("\\x%02X", defaultByte)); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenDimEnc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenDimEnc.java index 39a985e..9ce1577 100644 --- a/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenDimEnc.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenDimEnc.java @@ -30,7 +30,7 @@ import org.apache.kylin.metadata.datatype.DataTypeSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FixedLenDimEnc extends DimensionEncoding implements Serializable { +public class FixedLenDimEnc extends DimensionEncoding implements Serializable{ private static final long serialVersionUID = 1L; private static Logger logger = LoggerFactory.getLogger(FixedLenDimEnc.class); @@ -100,9 +100,7 @@ public class FixedLenDimEnc extends DimensionEncoding implements Serializable { int valueLen = value.length; if (valueLen > fixedLen) { if (avoidVerbose++ % 10000 == 0) { - logger.warn( - "Expect at most " + fixedLen + " bytes, but got " + valueLen + ", will truncate, value string: " - + Bytes.toString(value, 0, valueLen) + " times:" + avoidVerbose); + logger.warn("Expect at most " + fixedLen + " bytes, but got " + valueLen + ", will truncate, value string: " + Bytes.toString(value, 0, valueLen) + " times:" + avoidVerbose); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenHexDimEnc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenHexDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenHexDimEnc.java index 701aebc..a931450 100644 --- a/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenHexDimEnc.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenHexDimEnc.java @@ -44,7 +44,7 @@ import com.google.common.base.Preconditions; * <p> * Due to these limitations hex representation of hash values(with no padding, better with even characters) is more suitable */ -public class FixedLenHexDimEnc extends DimensionEncoding implements Serializable { +public class FixedLenHexDimEnc extends DimensionEncoding implements Serializable{ private static final long serialVersionUID = 1L; private static Logger logger = LoggerFactory.getLogger(FixedLenHexDimEnc.class); @@ -166,19 +166,16 @@ public class FixedLenHexDimEnc extends DimensionEncoding implements Serializable byte[] value = Bytes.toBytes(valueStr); int valueLen = value.length; int endOffset = outputOffset + bytelen; - + if (valueLen > hexLength) { if (avoidVerbose++ % 10000 == 0) { - logger.warn("Expect at most " + hexLength + " bytes, but got " + valueLen - + ", will truncate, value string: " + Bytes.toString(value, 0, valueLen) + " times:" - + avoidVerbose); + logger.warn("Expect at most " + hexLength + " bytes, but got " + valueLen + ", will truncate, value string: " + Bytes.toString(value, 0, valueLen) + " times:" + avoidVerbose); } } if (valueLen >= hexLength && isF(value, 0, hexLength)) { if (avoidVerbose2++ % 10000 == 0) { - logger.warn("All 'F' value: " + Bytes.toString(value, 0, valueLen) - + "will become null after encode/decode. times:" + avoidVerbose); + logger.warn("All 'F' value: " + Bytes.toString(value, 0, valueLen) + "will become null after encode/decode. times:" + avoidVerbose); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/dimension/IntDimEnc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/IntDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/IntDimEnc.java index 695fa03..3650200 100644 --- a/core-metadata/src/main/java/org/apache/kylin/dimension/IntDimEnc.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/IntDimEnc.java @@ -34,13 +34,12 @@ import org.slf4j.LoggerFactory; * deprecated use IntegerDimEnc instead * @deprecated */ -public class IntDimEnc extends DimensionEncoding implements Serializable { +public class IntDimEnc extends DimensionEncoding implements Serializable{ private static final long serialVersionUID = 1L; private static Logger logger = LoggerFactory.getLogger(IntDimEnc.class); - private static final long[] CAP = { 0, 0xffL, 0xffffL, 0xffffffL, 0xffffffffL, 0xffffffffffL, 0xffffffffffffL, - 0xffffffffffffffL, Long.MAX_VALUE }; + private static final long[] CAP = { 0, 0xffL, 0xffffL, 0xffffffL, 0xffffffffL, 0xffffffffffL, 0xffffffffffffL, 0xffffffffffffffL, Long.MAX_VALUE }; public static final String ENCODING_NAME = "int"; @@ -88,8 +87,7 @@ public class IntDimEnc extends DimensionEncoding implements Serializable { long integer = Long.parseLong(valueStr); if (integer > CAP[fixedLen]) { if (avoidVerbose++ % 10000 == 0) { - logger.warn("Expect at most " + fixedLen + " bytes, but got " + valueStr + ", will truncate, hit times:" - + avoidVerbose); + logger.warn("Expect at most " + fixedLen + " bytes, but got " + valueStr + ", will truncate, hit times:" + avoidVerbose); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/dimension/IntegerDimEnc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/IntegerDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/IntegerDimEnc.java index 26a40d8..e024696 100644 --- a/core-metadata/src/main/java/org/apache/kylin/dimension/IntegerDimEnc.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/IntegerDimEnc.java @@ -37,17 +37,14 @@ import org.slf4j.LoggerFactory; * -2^(8*N-1) is not supported because the slot is reserved for null values. * -2^(8*N-1) will be encoded with warn, and its output will be null */ -public class IntegerDimEnc extends DimensionEncoding implements Serializable { +public class IntegerDimEnc extends DimensionEncoding implements Serializable{ private static final long serialVersionUID = 1L; private static Logger logger = LoggerFactory.getLogger(IntegerDimEnc.class); - private static final long[] CAP = { 0, 0x7fL, 0x7fffL, 0x7fffffL, 0x7fffffffL, 0x7fffffffffL, 0x7fffffffffffL, - 0x7fffffffffffffL, 0x7fffffffffffffffL }; - private static final long[] MASK = { 0, 0xffL, 0xffffL, 0xffffffL, 0xffffffffL, 0xffffffffffL, 0xffffffffffffL, - 0xffffffffffffffL, 0xffffffffffffffffL }; - private static final long[] TAIL = { 0, 0x80L, 0x8000L, 0x800000L, 0x80000000L, 0x8000000000L, 0x800000000000L, - 0x80000000000000L, 0x8000000000000000L }; + private static final long[] CAP = { 0, 0x7fL, 0x7fffL, 0x7fffffL, 0x7fffffffL, 0x7fffffffffL, 0x7fffffffffffL, 0x7fffffffffffffL, 0x7fffffffffffffffL }; + private static final long[] MASK = { 0, 0xffL, 0xffffL, 0xffffffL, 0xffffffffL, 0xffffffffffL, 0xffffffffffffL, 0xffffffffffffffL, 0xffffffffffffffffL }; + private static final long[] TAIL = { 0, 0x80L, 0x8000L, 0x800000L, 0x80000000L, 0x8000000000L, 0x800000000000L, 0x80000000000000L, 0x8000000000000000L }; static { for (int i = 1; i < TAIL.length; ++i) { long head = ~MASK[i]; @@ -102,8 +99,7 @@ public class IntegerDimEnc extends DimensionEncoding implements Serializable { long integer = Long.parseLong(valueStr); if (integer > CAP[fixedLen] || integer < TAIL[fixedLen]) { if (avoidVerbose++ % 10000 == 0) { - logger.warn("Expect at most " + fixedLen + " bytes, but got " + valueStr + ", will truncate, hit times:" - + avoidVerbose); + logger.warn("Expect at most " + fixedLen + " bytes, but got " + valueStr + ", will truncate, hit times:" + avoidVerbose); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/dimension/OneMoreByteVLongDimEnc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/OneMoreByteVLongDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/OneMoreByteVLongDimEnc.java index 5738ce9..d998f44 100644 --- a/core-metadata/src/main/java/org/apache/kylin/dimension/OneMoreByteVLongDimEnc.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/OneMoreByteVLongDimEnc.java @@ -18,6 +18,11 @@ package org.apache.kylin.dimension; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; @@ -25,25 +30,17 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Arrays; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.metadata.datatype.DataTypeSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * not being used yet, prepared for future */ -public class OneMoreByteVLongDimEnc extends DimensionEncoding implements Serializable { +public class OneMoreByteVLongDimEnc extends DimensionEncoding implements Serializable{ private static final long serialVersionUID = 1L; private static Logger logger = LoggerFactory.getLogger(OneMoreByteVLongDimEnc.class); - private static final long[] CAP = { 0, 0x7fL, 0x7fffL, 0x7fffffL, 0x7fffffffL, 0x7fffffffffL, 0x7fffffffffffL, - 0x7fffffffffffffL, 0x7fffffffffffffffL }; - private static final long[] MASK = { 0, 0xffL, 0xffffL, 0xffffffL, 0xffffffffL, 0xffffffffffL, 0xffffffffffffL, - 0xffffffffffffffL, 0xffffffffffffffffL }; - private static final long[] TAIL = { 0, 0x80L, 0x8000L, 0x800000L, 0x80000000L, 0x8000000000L, 0x800000000000L, - 0x80000000000000L, 0x8000000000000000L }; + private static final long[] CAP = { 0, 0x7fL, 0x7fffL, 0x7fffffL, 0x7fffffffL, 0x7fffffffffL, 0x7fffffffffffL, 0x7fffffffffffffL, 0x7fffffffffffffffL }; + private static final long[] MASK = { 0, 0xffL, 0xffffL, 0xffffffL, 0xffffffffL, 0xffffffffffL, 0xffffffffffffL, 0xffffffffffffffL, 0xffffffffffffffffL }; + private static final long[] TAIL = { 0, 0x80L, 0x8000L, 0x800000L, 0x80000000L, 0x8000000000L, 0x800000000000L, 0x80000000000000L, 0x8000000000000000L }; static { for (int i = 1; i < TAIL.length; ++i) { long head = ~MASK[i]; @@ -98,8 +95,7 @@ public class OneMoreByteVLongDimEnc extends DimensionEncoding implements Seriali long integer = Long.parseLong(valueStr); if (integer > CAP[fixedLen] || integer < TAIL[fixedLen]) { if (avoidVerbose++ % 10000 == 0) { - logger.warn("Expect at most " + fixedLen + " bytes, but got " + valueStr + ", will truncate, hit times:" - + avoidVerbose); + logger.warn("Expect at most " + fixedLen + " bytes, but got " + valueStr + ", will truncate, hit times:" + avoidVerbose); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java index 09b6a8b..44e5708 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java @@ -18,13 +18,13 @@ package org.apache.kylin.measure; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.MeasureDesc; + import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.Collection; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.MeasureDesc; - /** * This class embeds a reusable byte buffer for measure encoding, and is not thread-safe. * The buffer will grow to accommodate BufferOverflowException until a limit. http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java index a54f471..710f324 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java @@ -94,8 +94,7 @@ public class MeasureAggregators implements Serializable { } public void aggregate(Object[] values1, Object[] values2, Object[] result, boolean[] aggrMask) { - assert values1.length == values2.length && values2.length == descLength && values1.length == result.length - && result.length == aggrMask.length; + assert values1.length == values2.length && values2.length == descLength && values1.length == result.length && result.length == aggrMask.length; for (int i = 0; i < descLength; i++) { if (aggrMask[i]) { result[i] = aggs[i].aggregate(values1[i], values2[i]); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java index edfc8ea..2d73e59 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java @@ -18,13 +18,13 @@ package org.apache.kylin.measure; -import java.nio.ByteBuffer; -import java.util.Collection; - import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.DataTypeSerializer; import org.apache.kylin.metadata.model.MeasureDesc; +import java.nio.ByteBuffer; +import java.util.Collection; + /** * @author yangli9 * http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java index 42537eb..ed2cb02 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java @@ -18,13 +18,13 @@ package org.apache.kylin.measure; -import java.util.Collection; -import java.util.Map; - import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; +import java.util.Collection; +import java.util.Map; + abstract public class MeasureIngester<V> implements java.io.Serializable { private static final long serialVersionUID = 1L; @@ -41,15 +41,13 @@ abstract public class MeasureIngester<V> implements java.io.Serializable { return result; } - abstract public V valueOf(String[] values, MeasureDesc measureDesc, - Map<TblColRef, Dictionary<String>> dictionaryMap); + abstract public V valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap); public void reset() { } - public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, - Map<TblColRef, Dictionary<String>> newDicts) { + public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) { throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java index 8e7a1f5..f609dd5 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java @@ -18,11 +18,6 @@ package org.apache.kylin.measure; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; - import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -32,6 +27,11 @@ import org.apache.kylin.metadata.realization.SQLDigest; import org.apache.kylin.metadata.tuple.Tuple; import org.apache.kylin.metadata.tuple.TupleInfo; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + /** * MeasureType captures how a kind of aggregation is defined, how it is calculated * during cube build, and how it is involved in query and storage scan. @@ -95,8 +95,7 @@ abstract public class MeasureType<T> implements java.io.Serializable { * be modified to drop the satisfied dimension or measure, and a CapabilityInfluence object * must be returned to mark the contribution of this measure type. */ - public CapabilityInfluence influenceCapabilityCheck(Collection<TblColRef> unmatchedDimensions, - Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, MeasureDesc measureDesc) { + public CapabilityInfluence influenceCapabilityCheck(Collection<TblColRef> unmatchedDimensions, Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, MeasureDesc measureDesc) { return null; } @@ -143,8 +142,7 @@ abstract public class MeasureType<T> implements java.io.Serializable { } /** The advanced filling mode, multiple tuples per storage record. */ - public IAdvMeasureFiller getAdvancedTupleFiller(FunctionDesc function, TupleInfo returnTupleInfo, - Map<TblColRef, Dictionary<String>> dictionaryMap) { + public IAdvMeasureFiller getAdvancedTupleFiller(FunctionDesc function, TupleInfo returnTupleInfo, Map<TblColRef, Dictionary<String>> dictionaryMap) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java index 62cb003..7f3a5f1 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java @@ -121,8 +121,7 @@ abstract public class MeasureTypeFactory<T> { logger.info("Checking custom measure types from kylin config: " + customFactory); factoryInsts.add((MeasureTypeFactory<?>) Class.forName(customFactory).newInstance()); } catch (Exception e) { - throw new IllegalArgumentException("Unrecognized MeasureTypeFactory classname: " + customFactory, - e); + throw new IllegalArgumentException("Unrecognized MeasureTypeFactory classname: " + customFactory, e); } } } catch (KylinConfigCannotInitException e) { @@ -133,12 +132,10 @@ abstract public class MeasureTypeFactory<T> { for (MeasureTypeFactory<?> factory : factoryInsts) { String funcName = factory.getAggrFunctionName(); if (funcName.equals(funcName.toUpperCase()) == false) - throw new IllegalArgumentException( - "Aggregation function name '" + funcName + "' must be in upper case"); + throw new IllegalArgumentException("Aggregation function name '" + funcName + "' must be in upper case"); String dataTypeName = factory.getAggrDataTypeName(); if (dataTypeName.equals(dataTypeName.toLowerCase()) == false) - throw new IllegalArgumentException( - "Aggregation data type name '" + dataTypeName + "' must be in lower case"); + throw new IllegalArgumentException("Aggregation data type name '" + dataTypeName + "' must be in lower case"); Class<? extends DataTypeSerializer<?>> serializer = factory.getAggrDataTypeSerializer(); logger.info("registering " + funcName + "(" + dataTypeName + "), " + factory.getClass()); @@ -156,8 +153,7 @@ abstract public class MeasureTypeFactory<T> { } private static void registerUDAF(MeasureTypeFactory<?> factory) { - MeasureType<?> type = factory.createMeasureType(factory.getAggrFunctionName(), - DataType.getType(factory.getAggrDataTypeName())); + MeasureType<?> type = factory.createMeasureType(factory.getAggrFunctionName(), DataType.getType(factory.getAggrDataTypeName())); Map<String, Class<?>> udafs = type.getRewriteCalciteAggrFunctions(); if (type.needRewrite() == false || udafs == null) return; @@ -168,8 +164,7 @@ abstract public class MeasureTypeFactory<T> { continue; // skip built-in function if (udafFactories.containsKey(udaf)) - throw new IllegalStateException( - "UDAF '" + udaf + "' was dup declared by " + udafFactories.get(udaf) + " and " + factory); + throw new IllegalStateException("UDAF '" + udaf + "' was dup declared by " + udafFactories.get(udaf) + " and " + factory); udafFactories.put(udaf, factory); udafMap.put(udaf, udafs.get(udaf)); @@ -191,8 +186,7 @@ abstract public class MeasureTypeFactory<T> { public static MeasureType<?> createNoRewriteFieldsMeasureType(String funcName, DataType dataType) { // currently only has DimCountDistinctAgg if (funcName.equalsIgnoreCase(FunctionDesc.FUNC_COUNT_DISTINCT)) { - return new DimCountDistinctMeasureType.DimCountDistinctMeasureTypeFactory().createMeasureType(funcName, - dataType); + return new DimCountDistinctMeasureType.DimCountDistinctMeasureTypeFactory().createMeasureType(funcName, dataType); } throw new UnsupportedOperationException("No measure type found."); @@ -234,8 +228,7 @@ abstract public class MeasureTypeFactory<T> { if (needRewrite == null) needRewrite = Boolean.valueOf(b); else if (needRewrite.booleanValue() != b) - throw new IllegalStateException( - "needRewrite() of factorys " + factory + " does not have consensus"); + throw new IllegalStateException("needRewrite() of factorys " + factory + " does not have consensus"); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java index 114aa8e..ed493a1 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java @@ -67,18 +67,15 @@ public class BasicMeasureType extends MeasureType { if (funcName.equals(FunctionDesc.FUNC_SUM)) { if (rtype.isNumberFamily() == false) { - throw new IllegalArgumentException( - "Return type for function " + funcName + " must be one of " + DataType.NUMBER_FAMILY); + throw new IllegalArgumentException("Return type for function " + funcName + " must be one of " + DataType.NUMBER_FAMILY); } } else if (funcName.equals(FunctionDesc.FUNC_COUNT)) { if (rtype.isIntegerFamily() == false) { - throw new IllegalArgumentException( - "Return type for function " + funcName + " must be one of " + DataType.INTEGER_FAMILY); + throw new IllegalArgumentException("Return type for function " + funcName + " must be one of " + DataType.INTEGER_FAMILY); } } else if (funcName.equals(FunctionDesc.FUNC_MAX) || funcName.equals(FunctionDesc.FUNC_MIN)) { if (rtype.isNumberFamily() == false) { - throw new IllegalArgumentException( - "Return type for function " + funcName + " must be one of " + DataType.NUMBER_FAMILY); + throw new IllegalArgumentException("Return type for function " + funcName + " must be one of " + DataType.NUMBER_FAMILY); } } else { KylinConfig config = KylinConfig.getInstanceFromEnv(); @@ -123,8 +120,7 @@ public class BasicMeasureType extends MeasureType { else if (dataType.isNumberFamily()) return new DoubleMinAggregator(); } - throw new IllegalArgumentException( - "No aggregator for func '" + funcName + "' and return type '" + dataType + "'"); + throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + dataType + "'"); } private boolean isSum() { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java index 6391a14..c7541ab 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java @@ -29,8 +29,7 @@ import org.apache.kylin.metadata.model.TblColRef; public class BigDecimalIngester extends MeasureIngester<BigDecimal> { @Override - public BigDecimal valueOf(String[] values, MeasureDesc measureDesc, - Map<TblColRef, Dictionary<String>> dictionaryMap) { + public BigDecimal valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { if (values.length > 1) throw new IllegalArgumentException(); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java index 033c0ef..a1e2665 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java @@ -17,12 +17,12 @@ */ package org.apache.kylin.measure.bitmap; +import org.apache.kylin.measure.ParamAsMeasureCount; + import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import org.apache.kylin.measure.ParamAsMeasureCount; - /** * BitmapIntersectDistinctCountAggFunc is an UDAF used for calculating the intersection of two or more bitmaps * Usage: intersect_count(columnToCount, columnToFilter, filterList) @@ -99,3 +99,4 @@ public class BitmapIntersectDistinctCountAggFunc implements ParamAsMeasureCount return result.result(); } } + http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java index d603116..e4fb079 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java @@ -80,14 +80,13 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> { @Override public void validate(FunctionDesc functionDesc) throws IllegalArgumentException { checkArgument(FUNC_COUNT_DISTINCT.equals(functionDesc.getExpression()), - "BitmapMeasureType only support function %s, got %s", FUNC_COUNT_DISTINCT, - functionDesc.getExpression()); - checkArgument(functionDesc.getParameterCount() == 1, "BitmapMeasureType only support 1 parameter, got %d", - functionDesc.getParameterCount()); + "BitmapMeasureType only support function %s, got %s", FUNC_COUNT_DISTINCT, functionDesc.getExpression()); + checkArgument(functionDesc.getParameterCount() == 1, + "BitmapMeasureType only support 1 parameter, got %d", functionDesc.getParameterCount()); String returnType = functionDesc.getReturnDataType().getName(); - checkArgument(DATATYPE_BITMAP.equals(returnType), "BitmapMeasureType's return type must be %s, got %s", - DATATYPE_BITMAP, returnType); + checkArgument(DATATYPE_BITMAP.equals(returnType), + "BitmapMeasureType's return type must be %s, got %s", DATATYPE_BITMAP, returnType); } @Override @@ -103,8 +102,7 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> { BitmapCounter current = factory.newBitmap(); @Override - public BitmapCounter valueOf(String[] values, MeasureDesc measureDesc, - Map<TblColRef, Dictionary<String>> dictionaryMap) { + public BitmapCounter valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { checkArgument(values.length == 1, "expect 1 value, got %s", Arrays.toString(values)); current.clear(); @@ -127,8 +125,7 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> { } @Override - public BitmapCounter reEncodeDictionary(BitmapCounter value, MeasureDesc measureDesc, - Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) { + public BitmapCounter reEncodeDictionary(BitmapCounter value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) { if (!needDictionaryColumn(measureDesc.getFunction())) { return value; } @@ -185,7 +182,8 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> { return true; } - static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.of(FUNC_COUNT_DISTINCT, BitmapDistinctCountAggFunc.class, + static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.of( + FUNC_COUNT_DISTINCT, BitmapDistinctCountAggFunc.class, FUNC_INTERSECT_COUNT_DISTINCT, BitmapIntersectDistinctCountAggFunc.class); @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java index d990893..c1b260d 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java @@ -18,12 +18,12 @@ package org.apache.kylin.measure.bitmap; -import java.io.IOException; -import java.nio.ByteBuffer; - import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import java.io.IOException; +import java.nio.ByteBuffer; + public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> { private static final BitmapCounterFactory factory = RoaringBitmapCounterFactory.INSTANCE; private static final BitmapCounter DELEGATE = factory.newBitmap(); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java index 47571ad..eec45f2 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java @@ -18,6 +18,10 @@ package org.apache.kylin.measure.bitmap; +import org.apache.kylin.common.util.ByteBufferOutputStream; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + import java.io.DataOutputStream; import java.io.IOException; import java.io.Serializable; @@ -25,10 +29,6 @@ import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.Iterator; -import org.apache.kylin.common.util.ByteBufferOutputStream; -import org.roaringbitmap.buffer.ImmutableRoaringBitmap; -import org.roaringbitmap.buffer.MutableRoaringBitmap; - /** * A {@link BitmapCounter} based on roaring bitmap. */ @@ -134,7 +134,8 @@ public class RoaringBitmapCounter implements BitmapCounter, Serializable { @Override public boolean equals(Object obj) { - return (obj instanceof RoaringBitmapCounter) && bitmap.equals(((RoaringBitmapCounter) obj).bitmap); + return (obj instanceof RoaringBitmapCounter) && + bitmap.equals(((RoaringBitmapCounter) obj).bitmap); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java index a4ad199..822afa2 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java @@ -18,17 +18,16 @@ package org.apache.kylin.measure.bitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; -import org.roaringbitmap.buffer.MutableRoaringBitmap; - public class RoaringBitmapCounterFactory implements BitmapCounterFactory, Serializable { public static final BitmapCounterFactory INSTANCE = new RoaringBitmapCounterFactory(); - private RoaringBitmapCounterFactory() { - } + private RoaringBitmapCounterFactory() {} @Override public BitmapCounter newBitmap() { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java index f3342cd..0b3fd94 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java @@ -80,8 +80,7 @@ public class DimCountDistinctMeasureType extends MeasureType<Object> { return false; } - static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.<String, Class<?>> of(FunctionDesc.FUNC_COUNT_DISTINCT, - DimCountDistinctAggFunc.class); + static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.<String, Class<?>> of(FunctionDesc.FUNC_COUNT_DISTINCT, DimCountDistinctAggFunc.class); @Override public Map<String, Class<?>> getRewriteCalciteAggrFunctions() { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java index 29090c7..de5ee25 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java @@ -111,8 +111,7 @@ public class ExtendedColumnMeasureType extends MeasureType<ByteArray> { } @Override - public CapabilityResult.CapabilityInfluence influenceCapabilityCheck(Collection<TblColRef> unmatchedDimensions, - Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, MeasureDesc measureDesc) { + public CapabilityResult.CapabilityInfluence influenceCapabilityCheck(Collection<TblColRef> unmatchedDimensions, Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, MeasureDesc measureDesc) { TblColRef extendedCol = getExtendedColumn(measureDesc.getFunction()); if (!unmatchedDimensions.contains(extendedCol)) { @@ -137,11 +136,9 @@ public class ExtendedColumnMeasureType extends MeasureType<ByteArray> { return true; } - public IAdvMeasureFiller getAdvancedTupleFiller(FunctionDesc function, TupleInfo returnTupleInfo, - Map<TblColRef, Dictionary<String>> dictionaryMap) { + public IAdvMeasureFiller getAdvancedTupleFiller(FunctionDesc function, TupleInfo returnTupleInfo, Map<TblColRef, Dictionary<String>> dictionaryMap) { final TblColRef extended = getExtendedColumn(function); - final int extendedColumnInTupleIdx = returnTupleInfo.hasColumn(extended) - ? returnTupleInfo.getColumnIndex(extended) : -1; + final int extendedColumnInTupleIdx = returnTupleInfo.hasColumn(extended) ? returnTupleInfo.getColumnIndex(extended) : -1; if (extendedColumnInTupleIdx == -1) { throw new RuntimeException("Extended column is not required in returnTupleInfo"); @@ -211,8 +208,7 @@ public class ExtendedColumnMeasureType extends MeasureType<ByteArray> { } @Override - public ByteArray valueOf(String[] values, MeasureDesc measureDesc, - Map<TblColRef, Dictionary<String>> dictionaryMap) { + public ByteArray valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { if (values.length <= 1) throw new IllegalArgumentException(); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java index 8718869..51c5a66 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java @@ -95,12 +95,11 @@ public class HLLCMeasureType extends MeasureType<HLLCounter> { public MeasureIngester<HLLCounter> newIngester() { return new MeasureIngester<HLLCounter>() { private static final long serialVersionUID = 1L; - + HLLCounter current = new HLLCounter(dataType.getPrecision()); @Override - public HLLCounter valueOf(String[] values, MeasureDesc measureDesc, - Map<TblColRef, Dictionary<String>> dictionaryMap) { + public HLLCounter valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { HLLCounter hllc = current; hllc.clear(); if (values.length == 1) { @@ -136,9 +135,8 @@ public class HLLCMeasureType extends MeasureType<HLLCounter> { return true; } - static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.<String, Class<?>> of(FUNC_COUNT_DISTINCT, - HLLDistinctCountAggFunc.class); - + static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.<String, Class<?>> of(FUNC_COUNT_DISTINCT, HLLDistinctCountAggFunc.class); + @Override public Map<String, Class<?>> getRewriteCalciteAggrFunctions() { return UDAF_MAP; @@ -147,5 +145,5 @@ public class HLLCMeasureType extends MeasureType<HLLCounter> { public static boolean isCountDistinct(FunctionDesc func) { return FUNC_COUNT_DISTINCT.equalsIgnoreCase(func.getExpression()); } - + } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java index 8735ccb..df0cfaf 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java @@ -18,12 +18,12 @@ package org.apache.kylin.measure.hllc; -import java.io.IOException; -import java.nio.ByteBuffer; - import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import java.io.IOException; +import java.nio.ByteBuffer; + /** * @author yangli9 * http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java index 7ef06d4..b793465 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java @@ -18,6 +18,10 @@ package org.apache.kylin.measure.hllc; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import org.apache.kylin.common.util.BytesUtil; + import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; @@ -25,11 +29,6 @@ import java.nio.charset.Charset; import java.util.Collection; import java.util.Map; -import org.apache.kylin.common.util.BytesUtil; - -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; - @SuppressWarnings("serial") public class HLLCounter implements Serializable, Comparable<HLLCounter> { @@ -100,7 +99,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> { add(hashFunc.hashBytes(value, offset, length).asLong()); } - public void addHashDirectly(long hash) { + public void addHashDirectly(long hash){ add(hash); } @@ -142,36 +141,36 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> { assert this.p == another.p; assert this.hashFunc == another.hashFunc; switch (register.getRegisterType()) { - case SINGLE_VALUE: - switch (another.getRegisterType()) { case SINGLE_VALUE: - if (register.getSize() > 0 && another.register.getSize() > 0) { - register = ((SingleValueRegister) register).toSparse(); - } else { - SingleValueRegister sr = (SingleValueRegister) another.register; - if (sr.getSize() > 0) - register.set(sr.getSingleValuePos(), sr.getValue()); - return; + switch (another.getRegisterType()) { + case SINGLE_VALUE: + if (register.getSize() > 0 && another.register.getSize() > 0) { + register = ((SingleValueRegister) register).toSparse(); + } else { + SingleValueRegister sr = (SingleValueRegister) another.register; + if (sr.getSize() > 0) + register.set(sr.getSingleValuePos(), sr.getValue()); + return; + } + break; + case SPARSE: + register = ((SingleValueRegister) register).toSparse(); + break; + case DENSE: + register = ((SingleValueRegister) register).toDense(this.p); + break; + default: + break; } + break; case SPARSE: - register = ((SingleValueRegister) register).toSparse(); - break; - case DENSE: - register = ((SingleValueRegister) register).toDense(this.p); + if (another.getRegisterType() == RegisterType.DENSE) { + register = ((SparseRegister) register).toDense(p); + } break; default: break; - } - - break; - case SPARSE: - if (another.getRegisterType() == RegisterType.DENSE) { - register = ((SparseRegister) register).toDense(p); - } - break; - default: - break; } register.merge(another.register); toDenseIfNeeded(); @@ -253,8 +252,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> { double er2 = Math.round(rate * 2 * 10000) / 100D; double er3 = Math.round(rate * 3 * 10000) / 100D; long size = Math.round(Math.pow(2, p)); - System.out.println("HLLC" + p + ",\t" + size + " bytes,\t68% err<" + er + "%" + ",\t95% err<" + er2 + "%" - + ",\t99.7% err<" + er3 + "%"); + System.out.println("HLLC" + p + ",\t" + size + " bytes,\t68% err<" + er + "%" + ",\t95% err<" + er2 + "%" + ",\t99.7% err<" + er3 + "%"); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounterOld.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounterOld.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounterOld.java index 494e173..5cbdd43 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounterOld.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounterOld.java @@ -294,7 +294,7 @@ public class HLLCounterOld implements Serializable, Comparable<HLLCounterOld> { /*public void writeRegistersArray(final ByteBuffer out) { out.put(this.registers); } - + public void readRegistersArray(ByteBuffer in) { in.get(registers, 0, m); singleBucket = Integer.MIN_VALUE; @@ -362,8 +362,7 @@ public class HLLCounterOld implements Serializable, Comparable<HLLCounterOld> { double er2 = Math.round(rate * 2 * 10000) / 100D; double er3 = Math.round(rate * 3 * 10000) / 100D; long size = Math.round(Math.pow(2, p)); - System.out.println("HLLC" + p + ",\t" + size + " bytes,\t68% err<" + er + "%" + ",\t95% err<" + er2 + "%" - + ",\t99.7% err<" + er3 + "%"); + System.out.println("HLLC" + p + ",\t" + size + " bytes,\t68% err<" + er + "%" + ",\t95% err<" + er2 + "%" + ",\t99.7% err<" + er3 + "%"); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java index 7c062f8..c635cd6 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java @@ -91,8 +91,7 @@ public class HLLDistinctCountAggFunc { } else { long oldValue = Math.abs(this.value.longValue()); long take = Math.max(oldValue, value); - logger.warn("Error to aggregate holistic count distinct, old value " + oldValue + ", new value " + value - + ", taking " + take); + logger.warn("Error to aggregate holistic count distinct, old value " + oldValue + ", new value " + value + ", taking " + take); this.value = -take; // make it obvious that this value is wrong } }