http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java deleted file mode 100644 index d50baad..0000000 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.job.impl.threadpool; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import org.apache.kylin.job.BaseTestExecutable; -import org.apache.kylin.job.ErrorTestExecutable; -import org.apache.kylin.job.FailedTestExecutable; -import org.apache.kylin.job.SelfStopExecutable; -import org.apache.kylin.job.SucceedTestExecutable; -import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.job.execution.ExecutableState; -import org.junit.Test; - -/** - */ -public class DefaultSchedulerTest extends BaseSchedulerTest { - - @Test - public void testSingleTaskJob() throws Exception { - DefaultChainedExecutable job = new DefaultChainedExecutable(); - BaseTestExecutable task1 = new SucceedTestExecutable(); - job.addTask(task1); - jobService.addJob(job); - waitForJobFinish(job.getId()); - assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); - assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); - } - - @Test - public void testSucceed() throws Exception { - DefaultChainedExecutable job = new DefaultChainedExecutable(); - BaseTestExecutable task1 = new SucceedTestExecutable(); - BaseTestExecutable task2 = new SucceedTestExecutable(); - job.addTask(task1); - job.addTask(task2); - jobService.addJob(job); - waitForJobFinish(job.getId()); - assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); - assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); - assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState()); - } - - @Test - public void testSucceedAndFailed() throws Exception { - DefaultChainedExecutable job = new DefaultChainedExecutable(); - BaseTestExecutable task1 = new SucceedTestExecutable(); - BaseTestExecutable task2 = new FailedTestExecutable(); - job.addTask(task1); - job.addTask(task2); - jobService.addJob(job); - waitForJobFinish(job.getId()); - assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState()); - assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); - assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState()); - } - - @Test - public void testSucceedAndError() throws Exception { - DefaultChainedExecutable job = new DefaultChainedExecutable(); - BaseTestExecutable task1 = new ErrorTestExecutable(); - BaseTestExecutable task2 = new SucceedTestExecutable(); - job.addTask(task1); - job.addTask(task2); - jobService.addJob(job); - waitForJobFinish(job.getId()); - assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState()); - assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState()); - assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState()); - } - - @Test - public void testDiscard() throws Exception { - DefaultChainedExecutable job = new DefaultChainedExecutable(); - BaseTestExecutable task1 = new SelfStopExecutable(); - job.addTask(task1); - jobService.addJob(job); - waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500); - jobService.discardJob(job.getId()); - waitForJobFinish(job.getId()); - assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState()); - assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState()); - Thread.sleep(5000); - System.out.println(job); - } - - @SuppressWarnings("rawtypes") - @Test - public void testSchedulerPool() throws InterruptedException { - ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1); - final CountDownLatch countDownLatch = new CountDownLatch(3); - ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - countDownLatch.countDown(); - } - }, 5, 5, TimeUnit.SECONDS); - assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS)); - assertTrue("future should still running", future.cancel(true)); - - final CountDownLatch countDownLatch2 = new CountDownLatch(3); - ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - countDownLatch2.countDown(); - throw new RuntimeException(); - } - }, 5, 5, TimeUnit.SECONDS); - assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS)); - assertFalse("future2 should has been stopped", future2.cancel(true)); - - final CountDownLatch countDownLatch3 = new CountDownLatch(3); - ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - countDownLatch3.countDown(); - throw new RuntimeException(); - } catch (Exception e) { - } - } - }, 5, 5, TimeUnit.SECONDS); - assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS)); - assertTrue("future3 should still running", future3.cancel(true)); - } -}
http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java index 0ae7e6a..18da37a 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInValueProviderFactory.java @@ -18,7 +18,6 @@ package org.apache.kylin.metadata.filter.UDF; -import org.apache.kylin.dimension.DimensionEncoding; import org.apache.kylin.metadata.filter.function.Functions; import org.apache.kylin.metadata.model.TblColRef; http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java ---------------------------------------------------------------------- diff --git a/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java b/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java new file mode 100644 index 0000000..3bb4540 --- /dev/null +++ b/core-storage/src/test/java/org/apache/kylin/storage/StorageMockUtils.java @@ -0,0 +1,189 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * / + */ + +package org.apache.kylin.storage; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.kylin.metadata.filter.ColumnTupleFilter; +import org.apache.kylin.metadata.filter.CompareTupleFilter; +import org.apache.kylin.metadata.filter.ConstantTupleFilter; +import org.apache.kylin.metadata.filter.LogicalTupleFilter; +import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.ParameterDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.tuple.TupleInfo; + +import com.google.common.collect.ImmutableList; + +/** + */ +public class StorageMockUtils { + public static TupleInfo newTupleInfo(List<TblColRef> groups, List<FunctionDesc> aggregations) { + TupleInfo info = new TupleInfo(); + int idx = 0; + + for (TblColRef col : groups) { + info.setField(col.getName(), col, idx++); + } + + TableDesc sourceTable = groups.get(0).getColumnDesc().getTable(); + for (FunctionDesc func : aggregations) { + TblColRef col = new TblColRef(func.newFakeRewriteColumn(sourceTable)); + info.setField(col.getName(), col, idx++); + } + + return info; + } + + public static List<TblColRef> buildGroups() { + List<TblColRef> groups = new ArrayList<TblColRef>(); + + TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT"); + ColumnDesc c1 = ColumnDesc.mockup(t1, 2, "CAL_DT", "date"); + TblColRef cf1 = new TblColRef(c1); + groups.add(cf1); + + TableDesc t2 = TableDesc.mockup("DEFAULT.TEST_CATEGORY_GROUPINGS"); + ColumnDesc c2 = ColumnDesc.mockup(t2, 14, "META_CATEG_NAME", "string"); + TblColRef cf2 = new TblColRef(c2); + groups.add(cf2); + + return groups; + } + + public static List<FunctionDesc> buildAggregations1() { + List<FunctionDesc> functions = new ArrayList<FunctionDesc>(); + + TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT"); + TblColRef priceCol = new TblColRef(ColumnDesc.mockup(t1, 7, "PRICE", "decimal(19,4)")); + + FunctionDesc f1 = new FunctionDesc(); + f1.setExpression("SUM"); + ParameterDesc p1 = new ParameterDesc(); + p1.setType("column"); + p1.setValue("PRICE"); + p1.setColRefs(ImmutableList.of(priceCol)); + f1.setParameter(p1); + f1.setReturnType("decimal(19,4)"); + functions.add(f1); + + + return functions; + } + + public static List<FunctionDesc> buildAggregations() { + List<FunctionDesc> functions = new ArrayList<FunctionDesc>(); + + TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT"); + TblColRef priceCol = new TblColRef(ColumnDesc.mockup(t1, 7, "PRICE", "decimal(19,4)")); + TblColRef sellerCol = new TblColRef(ColumnDesc.mockup(t1, 9, "SELLER_ID", "bigint")); + + FunctionDesc f1 = new FunctionDesc(); + f1.setExpression("SUM"); + ParameterDesc p1 = new ParameterDesc(); + p1.setType("column"); + p1.setValue("PRICE"); + p1.setColRefs(ImmutableList.of(priceCol)); + f1.setParameter(p1); + f1.setReturnType("decimal(19,4)"); + functions.add(f1); + + FunctionDesc f2 = new FunctionDesc(); + f2.setExpression("COUNT_DISTINCT"); + ParameterDesc p2 = new ParameterDesc(); + p2.setType("column"); + p2.setValue("SELLER_ID"); + p2.setColRefs(ImmutableList.of(sellerCol)); + f2.setParameter(p2); + f2.setReturnType("hllc(10)"); + functions.add(f2); + + return functions; + } + + public static CompareTupleFilter buildTs2010Filter(TblColRef column) { + CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT); + ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column); + compareFilter.addChild(columnFilter1); + ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2010-01-01"); + compareFilter.addChild(constantFilter1); + return compareFilter; + } + + public static CompareTupleFilter buildTs2011Filter(TblColRef column) { + CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT); + ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column); + compareFilter.addChild(columnFilter1); + ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2011-01-01"); + compareFilter.addChild(constantFilter1); + return compareFilter; + } + + public static CompareTupleFilter buildFilter1(TblColRef column) { + CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE); + ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column); + compareFilter.addChild(columnFilter1); + ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2012-05-23"); + compareFilter.addChild(constantFilter1); + return compareFilter; + } + + public static CompareTupleFilter buildFilter2(TblColRef column) { + CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ); + ColumnTupleFilter columnFilter2 = new ColumnTupleFilter(column); + compareFilter.addChild(columnFilter2); + ConstantTupleFilter constantFilter2 = new ConstantTupleFilter("ClothinShoes & Accessories"); + compareFilter.addChild(constantFilter2); + return compareFilter; + } + + public static CompareTupleFilter buildFilter3(TblColRef column) { + CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ); + ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column); + compareFilter.addChild(columnFilter1); + ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2012-05-23"); + compareFilter.addChild(constantFilter1); + return compareFilter; + } + + + public static TupleFilter buildAndFilter(List<TblColRef> columns) { + CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0)); + CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1)); + LogicalTupleFilter andFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND); + andFilter.addChild(compareFilter1); + andFilter.addChild(compareFilter2); + return andFilter; + } + + public static TupleFilter buildOrFilter(List<TblColRef> columns) { + CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0)); + CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1)); + LogicalTupleFilter logicFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.OR); + logicFilter.addChild(compareFilter1); + logicFilter.addChild(compareFilter2); + return logicFilter; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java ---------------------------------------------------------------------- diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java deleted file mode 100644 index 5f8f08f..0000000 --- a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.cache; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.kylin.metadata.filter.ColumnTupleFilter; -import org.apache.kylin.metadata.filter.CompareTupleFilter; -import org.apache.kylin.metadata.filter.ConstantTupleFilter; -import org.apache.kylin.metadata.filter.LogicalTupleFilter; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.model.ColumnDesc; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.ParameterDesc; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.tuple.TupleInfo; - -import com.google.common.collect.ImmutableList; - -/** - */ -public class StorageMockUtils { - public static TupleInfo newTupleInfo(List<TblColRef> groups, List<FunctionDesc> aggregations) { - TupleInfo info = new TupleInfo(); - int idx = 0; - - for (TblColRef col : groups) { - info.setField(col.getName(), col, idx++); - } - - TableDesc sourceTable = groups.get(0).getColumnDesc().getTable(); - for (FunctionDesc func : aggregations) { - TblColRef col = new TblColRef(func.newFakeRewriteColumn(sourceTable)); - info.setField(col.getName(), col, idx++); - } - - return info; - } - - public static List<TblColRef> buildGroups() { - List<TblColRef> groups = new ArrayList<TblColRef>(); - - TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT"); - ColumnDesc c1 = ColumnDesc.mockup(t1, 2, "CAL_DT", "date"); - TblColRef cf1 = new TblColRef(c1); - groups.add(cf1); - - TableDesc t2 = TableDesc.mockup("DEFAULT.TEST_CATEGORY_GROUPINGS"); - ColumnDesc c2 = ColumnDesc.mockup(t2, 14, "META_CATEG_NAME", "string"); - TblColRef cf2 = new TblColRef(c2); - groups.add(cf2); - - return groups; - } - - public static List<FunctionDesc> buildAggregations() { - List<FunctionDesc> functions = new ArrayList<FunctionDesc>(); - - TableDesc t1 = TableDesc.mockup("DEFAULT.TEST_KYLIN_FACT"); - TblColRef priceCol = new TblColRef(ColumnDesc.mockup(t1, 7, "PRICE", "decimal(19,4)")); - TblColRef sellerCol = new TblColRef(ColumnDesc.mockup(t1, 9, "SELLER_ID", "bigint")); - - FunctionDesc f1 = new FunctionDesc(); - f1.setExpression("SUM"); - ParameterDesc p1 = new ParameterDesc(); - p1.setType("column"); - p1.setValue("PRICE"); - p1.setColRefs(ImmutableList.of(priceCol)); - f1.setParameter(p1); - f1.setReturnType("decimal(19,4)"); - functions.add(f1); - - FunctionDesc f2 = new FunctionDesc(); - f2.setExpression("COUNT_DISTINCT"); - ParameterDesc p2 = new ParameterDesc(); - p2.setType("column"); - p2.setValue("SELLER_ID"); - p2.setColRefs(ImmutableList.of(sellerCol)); - f2.setParameter(p2); - f2.setReturnType("hllc(10)"); - functions.add(f2); - - return functions; - } - - public static CompareTupleFilter buildTs2010Filter(TblColRef column) { - CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT); - ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column); - compareFilter.addChild(columnFilter1); - ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2010-01-01"); - compareFilter.addChild(constantFilter1); - return compareFilter; - } - - public static CompareTupleFilter buildTs2011Filter(TblColRef column) { - CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT); - ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column); - compareFilter.addChild(columnFilter1); - ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2011-01-01"); - compareFilter.addChild(constantFilter1); - return compareFilter; - } - - public static CompareTupleFilter buildFilter1(TblColRef column) { - CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE); - ColumnTupleFilter columnFilter1 = new ColumnTupleFilter(column); - compareFilter.addChild(columnFilter1); - ConstantTupleFilter constantFilter1 = new ConstantTupleFilter("2012-05-23"); - compareFilter.addChild(constantFilter1); - return compareFilter; - } - - public static CompareTupleFilter buildFilter2(TblColRef column) { - CompareTupleFilter compareFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ); - ColumnTupleFilter columnFilter2 = new ColumnTupleFilter(column); - compareFilter.addChild(columnFilter2); - ConstantTupleFilter constantFilter2 = new ConstantTupleFilter("ClothinShoes & Accessories"); - compareFilter.addChild(constantFilter2); - return compareFilter; - } - - public static TupleFilter buildAndFilter(List<TblColRef> columns) { - CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0)); - CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1)); - LogicalTupleFilter andFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND); - andFilter.addChild(compareFilter1); - andFilter.addChild(compareFilter2); - return andFilter; - } - - public static TupleFilter buildOrFilter(List<TblColRef> columns) { - CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0)); - CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1)); - LogicalTupleFilter logicFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.OR); - logicFilter.addChild(compareFilter1); - logicFilter.addChild(compareFilter2); - return logicFilter; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/pom.xml ---------------------------------------------------------------------- diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml index 60815c7..1a3efb7 100644 --- a/kylin-it/pom.xml +++ b/kylin-it/pom.xml @@ -124,6 +124,13 @@ </dependency> <dependency> <groupId>org.apache.kylin</groupId> + <artifactId>kylin-core-job</artifactId> + <type>test-jar</type> + <scope>test</scope> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> <artifactId>kylin-storage-hbase</artifactId> <type>test-jar</type> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java index 4291d91..809cfb7 100644 --- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java @@ -63,8 +63,8 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase { CubeManager cubeManager = CubeManager.getInstance(kylinConfig); cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty"); - flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv"; - dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable); + flatTable = LOCALMETA_TEST_DATA + "/data/flatten_data_for_without_slr_left_join.csv"; + dictionaryMap = ITInMemCubeBuilderTest.getDictionaryMap(cube, flatTable); } @AfterClass @@ -84,7 +84,7 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase { { Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, new NoopWriter())); - InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed); + ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed); future.get(); } } @@ -101,7 +101,7 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase { @Override public void close() { - + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java new file mode 100644 index 0000000..ab9ac63 --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java @@ -0,0 +1,163 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * / + */ + +package org.apache.kylin.cube.inmemcubing; + +import static org.junit.Assert.assertEquals; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.dimension.Dictionary; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.metadata.model.TblColRef; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase { + + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(ITDoggedCubeBuilderTest.class); + + private static final int INPUT_ROWS = 10000; + private static final int SPLIT_ROWS = 5000; + private static final int THREADS = 4; + + private static CubeInstance cube; + private static String flatTable; + private static Map<TblColRef, Dictionary<String>> dictionaryMap; + + @BeforeClass + public static void before() throws IOException { + staticCreateTestMetadata(); + + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + CubeManager cubeManager = CubeManager.getInstance(kylinConfig); + + cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty"); + flatTable = LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/data/flatten_data_for_without_slr_left_join.csv"; + dictionaryMap = ITInMemCubeBuilderTest.getDictionaryMap(cube, flatTable); + } + + @AfterClass + public static void after() throws Exception { + staticCleanupTestMetadata(); + } + + @Test + public void test() throws Exception { + + ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + long randSeed = System.currentTimeMillis(); + + DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap); + doggedBuilder.setConcurrentThreads(THREADS); + doggedBuilder.setSplitRowThreshold(SPLIT_ROWS); + FileRecordWriter doggedResult = new FileRecordWriter(); + + { + Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult)); + ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed); + future.get(); + doggedResult.close(); + } + + InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap); + inmemBuilder.setConcurrentThreads(THREADS); + FileRecordWriter inmemResult = new FileRecordWriter(); + + { + Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult)); + ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed); + future.get(); + inmemResult.close(); + } + + fileCompare(doggedResult.file, inmemResult.file); + doggedResult.file.delete(); + inmemResult.file.delete(); + } + + private void fileCompare(File file, File file2) throws IOException { + BufferedReader r1 = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8")); + BufferedReader r2 = new BufferedReader(new InputStreamReader(new FileInputStream(file2), "UTF-8")); + + String line1, line2; + do { + line1 = r1.readLine(); + line2 = r2.readLine(); + + assertEquals(line1, line2); + + } while (line1 != null || line2 != null); + + r1.close(); + r2.close(); + } + + class FileRecordWriter implements ICuboidWriter { + + File file; + PrintWriter writer; + + FileRecordWriter() throws IOException { + file = File.createTempFile("DoggedCubeBuilderTest_", ".data"); + writer = new PrintWriter(file, "UTF-8"); + } + + @Override + public void write(long cuboidId, GTRecord record) throws IOException { + writer.print(cuboidId); + writer.print(", "); + writer.print(record.toString()); + writer.println(); + } + + @Override + public void flush() { + + } + + @Override + public void close() { + writer.close(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java new file mode 100644 index 0000000..ad02f2a --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java @@ -0,0 +1,271 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * / + */ + +package org.apache.kylin.cube.inmemcubing; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.commons.io.FileUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; +import org.apache.kylin.dict.DictionaryGenerator; +import org.apache.kylin.dict.IterableDictionaryValueEnumerator; +import org.apache.kylin.dimension.Dictionary; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + */ +public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase { + + private static final Logger logger = LoggerFactory.getLogger(ITInMemCubeBuilderTest.class); + + private CubeInstance cube; + private String flatTable; + private Map<TblColRef, Dictionary<String>> dictionaryMap; + + private int nInpRows; + private int nThreads; + + @Before + public void before() throws IOException { + createTestMetadata(); + } + + @After + public void after() throws Exception { + cleanupTestMetadata(); + } + + @Test + public void testKylinCube() throws Exception { + testBuild("test_kylin_cube_without_slr_left_join_empty", // + LOCALMETA_TEST_DATA + "/data/flatten_data_for_without_slr_left_join.csv", 70000, 4); + } + + @Test + public void testSSBCube() throws Exception { + testBuild("ssb", // + LOCALMETA_TEST_DATA + "/data/kylin_intermediate_ssb_19920101000000_19920201000000.csv", 1000, 1); + } + + public void testBuild(String cubeName, String flatTable, int nInpRows, int nThreads) throws Exception { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + CubeManager cubeManager = CubeManager.getInstance(kylinConfig); + + this.nInpRows = nInpRows; + this.nThreads = nThreads; + + this.cube = cubeManager.getCube(cubeName); + this.flatTable = flatTable; + this.dictionaryMap = getDictionaryMap(cube, flatTable); + + testBuildInner(); + } + + private void testBuildInner() throws Exception { + + InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap); + //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap); + cubeBuilder.setConcurrentThreads(nThreads); + + ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + + try { + // round 1 + { + Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter())); + feedData(cube, flatTable, queue, nInpRows); + future.get(); + } + + // round 2, zero input + { + Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter())); + feedData(cube, flatTable, queue, 0); + future.get(); + } + + // round 3 + { + Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter())); + feedData(cube, flatTable, queue, nInpRows); + future.get(); + } + + } catch (Exception e) { + logger.error("stream build failed", e); + throw new IOException("Failed to build cube ", e); + } + } + + static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException { + feedData(cube, flatTable, queue, count, 0); + } + + static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException { + CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null); + int nColumns = flatTableDesc.getColumnList().size(); + + @SuppressWarnings("unchecked") + Set<String>[] distinctSets = new Set[nColumns]; + for (int i = 0; i < nColumns; i++) + distinctSets[i] = new TreeSet<String>(); + + // get distinct values on each column + List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8"); + for (String line : lines) { + String[] row = line.trim().split(","); + assert row.length == nColumns; + for (int i = 0; i < nColumns; i++) + distinctSets[i].add(row[i]); + } + + List<String[]> distincts = new ArrayList<String[]>(); + for (int i = 0; i < nColumns; i++) { + distincts.add((String[]) distinctSets[i].toArray(new String[distinctSets[i].size()])); + } + + Random rand = new Random(); + if (randSeed != 0) + rand.setSeed(randSeed); + + // output with random data + for (; count > 0; count--) { + ArrayList<String> row = new ArrayList<String>(nColumns); + for (int i = 0; i < nColumns; i++) { + String[] candidates = distincts.get(i); + row.add(candidates[rand.nextInt(candidates.length)]); + } + queue.put(row); + } + queue.put(new ArrayList<String>(0)); + } + + static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException { + Map<TblColRef, Dictionary<String>> result = Maps.newHashMap(); + CubeDesc desc = cube.getDescriptor(); + CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null); + int nColumns = flatTableDesc.getColumnList().size(); + + List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns(); + for (int c = 0; c < columns.size(); c++) { + TblColRef col = columns.get(c); + if (desc.getRowkey().isUseDictionary(col)) { + logger.info("Building dictionary for " + col); + List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]); + Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList)); + result.put(col, dict); + } + } + + for (int measureIdx = 0; measureIdx < cube.getDescriptor().getMeasures().size(); measureIdx++) { + MeasureDesc measureDesc = cube.getDescriptor().getMeasures().get(measureIdx); + FunctionDesc func = measureDesc.getFunction(); + List<TblColRef> dictCols = func.getMeasureType().getColumnsNeedDictionary(func); + if (dictCols.isEmpty()) + continue; + + int[] flatTableIdx = flatTableDesc.getMeasureColumnIndexes()[measureIdx]; + List<TblColRef> paramCols = func.getParameter().getColRefs(); + for (int i = 0; i < paramCols.size(); i++) { + TblColRef col = paramCols.get(i); + if (dictCols.contains(col)) { + int colIdxOnFlat = flatTableIdx[i]; + logger.info("Building dictionary for " + col); + List<byte[]> valueList = readValueList(flatTable, nColumns, colIdxOnFlat); + Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList)); + + result.put(col, dict); + } + } + } + + return result; + } + + private static List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException { + List<byte[]> result = Lists.newArrayList(); + List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8"); + for (String line : lines) { + String[] row = line.trim().split(","); + if (row.length != nColumns) { + throw new IllegalStateException(); + } + if (row[c] != null) { + result.add(Bytes.toBytes(row[c])); + } + } + return result; + } + + class ConsoleGTRecordWriter implements ICuboidWriter { + + boolean verbose = false; + + @Override + public void write(long cuboidId, GTRecord record) throws IOException { + if (verbose) + System.out.println(record.toString()); + } + + @Override + public void flush() { + if (verbose) { + System.out.println("flush"); + } + } + + @Override + public void close() { + if (verbose) { + System.out.println("close"); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java b/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java new file mode 100644 index 0000000..ad1ddd3 --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java @@ -0,0 +1,154 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * / + */ + +package org.apache.kylin.job.impl.threadpool; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.kylin.job.BaseTestExecutable; +import org.apache.kylin.job.ErrorTestExecutable; +import org.apache.kylin.job.FailedTestExecutable; +import org.apache.kylin.job.SelfStopExecutable; +import org.apache.kylin.job.SucceedTestExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.junit.Assert; +import org.junit.Test; + +/** + */ +public class ITDefaultSchedulerTest extends BaseSchedulerTest { + + @Test + public void testSingleTaskJob() throws Exception { + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new SucceedTestExecutable(); + job.addTask(task1); + jobService.addJob(job); + waitForJobFinish(job.getId()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); + } + + @Test + public void testSucceed() throws Exception { + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new SucceedTestExecutable(); + BaseTestExecutable task2 = new SucceedTestExecutable(); + job.addTask(task1); + job.addTask(task2); + jobService.addJob(job); + waitForJobFinish(job.getId()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState()); + } + + @Test + public void testSucceedAndFailed() throws Exception { + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new SucceedTestExecutable(); + BaseTestExecutable task2 = new FailedTestExecutable(); + job.addTask(task1); + job.addTask(task2); + jobService.addJob(job); + waitForJobFinish(job.getId()); + Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState()); + } + + @Test + public void testSucceedAndError() throws Exception { + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new ErrorTestExecutable(); + BaseTestExecutable task2 = new SucceedTestExecutable(); + job.addTask(task1); + job.addTask(task2); + jobService.addJob(job); + waitForJobFinish(job.getId()); + Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState()); + } + + @Test + public void testDiscard() throws Exception { + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new SelfStopExecutable(); + job.addTask(task1); + jobService.addJob(job); + waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500); + jobService.discardJob(job.getId()); + waitForJobFinish(job.getId()); + Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState()); + Thread.sleep(5000); + System.out.println(job); + } + + @SuppressWarnings("rawtypes") + @Test + public void testSchedulerPool() throws InterruptedException { + ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1); + final CountDownLatch countDownLatch = new CountDownLatch(3); + ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + countDownLatch.countDown(); + } + }, 5, 5, TimeUnit.SECONDS); + assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS)); + assertTrue("future should still running", future.cancel(true)); + + final CountDownLatch countDownLatch2 = new CountDownLatch(3); + ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + countDownLatch2.countDown(); + throw new RuntimeException(); + } + }, 5, 5, TimeUnit.SECONDS); + assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS)); + assertFalse("future2 should has been stopped", future2.cancel(true)); + + final CountDownLatch countDownLatch3 = new CountDownLatch(3); + ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + countDownLatch3.countDown(); + throw new RuntimeException(); + } catch (Exception e) { + } + } + }, 5, 5, TimeUnit.SECONDS); + assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS)); + assertTrue("future3 should still running", future3.cancel(true)); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index c945485..e1cbe1f 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -99,12 +99,12 @@ public class BuildCubeWithEngine { logger.info("Will not use fast build mode"); } - System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); + System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA); if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); } - HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); try { //check hdfs permission http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java index 5ab5e83..d862dbf 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java @@ -80,7 +80,7 @@ public class BuildCubeWithSpark { public static void beforeClass() throws Exception { logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); + System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA); if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); } @@ -89,7 +89,7 @@ public class BuildCubeWithSpark { @Before public void before() throws Exception { - HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); DeployUtil.initCliWorkDir(); DeployUtil.deployMetadata(); @@ -117,7 +117,7 @@ public class BuildCubeWithSpark { @Test public void test() throws Exception { final CubeSegment segment = createSegment(); - String confPath = new File(AbstractKylinTestCase.SANDBOX_TEST_DATA).getAbsolutePath(); + String confPath = new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath(); KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar(); String coprocessor = KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar(); logger.info("confPath location:" + confPath); http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index eeff999..99da26f 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -67,11 +67,11 @@ public class BuildCubeWithStream { public static void beforeClass() throws Exception { logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); + System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA); if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); } - HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); } public void before() throws Exception { http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java index 4b8ce24..643b122 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java @@ -92,7 +92,7 @@ public class BuildIIWithEngine { public static void beforeClass() throws Exception { logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); + System.setProperty(KylinConfig.KYLIN_CONF,HBaseMetadataTestCase.SANDBOX_TEST_DATA); if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); } @@ -100,7 +100,7 @@ public class BuildIIWithEngine { @Before public void before() throws Exception { - HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); //DeployUtil.initCliWorkDir(); // DeployUtil.deployMetadata(); http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java index 9b7cd14..ace1a2f 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java @@ -109,7 +109,7 @@ public class BuildIIWithStream { if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); } - HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); } public void before() throws Exception { http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java index ec3e60f..46aa68d 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java @@ -19,11 +19,9 @@ package org.apache.kylin.query; import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import java.io.FileReader; -import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; @@ -38,22 +36,18 @@ import java.sql.Types; import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.TreeSet; import java.util.logging.LogManager; -import com.google.common.base.Throwables; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionContext; import org.apache.kylin.common.KylinConfig; import org.dbunit.Assertion; import org.dbunit.database.DatabaseConfig; import org.dbunit.database.DatabaseConnection; import org.dbunit.database.IDatabaseConnection; import org.dbunit.dataset.DataSetException; -import org.dbunit.dataset.DefaultTable; import org.dbunit.dataset.ITable; import org.dbunit.dataset.SortedTable; import org.dbunit.dataset.datatype.DataType; @@ -63,8 +57,6 @@ import org.dbunit.ext.h2.H2DataTypeFactory; import org.junit.Assert; import com.google.common.io.Files; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** */ @@ -287,7 +279,6 @@ public class KylinTestBase { return ret; } - protected void execQueryUsingH2(String queryFolder, boolean needSort) throws Exception { printInfo("---------- Running H2 queries: " + queryFolder); @@ -349,7 +340,6 @@ public class KylinTestBase { h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new TestH2DataTypeFactory()); ITable h2Table = executeQuery(h2Conn, queryName, sql, needSort); - try { // compare the result Assert.assertEquals(h2Table.getRowCount(), kylinTable.getRowCount()); @@ -427,7 +417,6 @@ public class KylinTestBase { h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new TestH2DataTypeFactory()); ITable h2Table = executeDynamicQuery(h2Conn, queryName, sql, parameters, needSort); - // compare the result Assertion.assertEquals(h2Table, kylinTable); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java index 15e435e..136342d 100644 --- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java @@ -38,7 +38,7 @@ import org.apache.kylin.metadata.tuple.ITupleIterator; import org.apache.kylin.storage.IStorageQuery; import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.StorageFactory; -import org.apache.kylin.storage.cache.StorageMockUtils; +import org.apache.kylin.storage.StorageMockUtils; import org.apache.kylin.storage.exception.ScanOutOfLimitException; import org.junit.After; import org.junit.AfterClass; http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/source-hive/pom.xml ---------------------------------------------------------------------- diff --git a/source-hive/pom.xml b/source-hive/pom.xml index eb36eed..796b9c1 100644 --- a/source-hive/pom.xml +++ b/source-hive/pom.xml @@ -44,6 +44,13 @@ <!-- Env & Test --> <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-core-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + <version>${project.parent.version}</version> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java ---------------------------------------------------------------------- diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java index 70c11b3..83c50c0 100644 --- a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java +++ b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java @@ -24,18 +24,16 @@ import java.io.File; import java.io.IOException; import org.apache.commons.io.FileUtils; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.junit.After; import org.junit.Before; import org.junit.Test; -/** - * Created by dongli on 2/22/16. - */ public class HiveCmdBuilderTest { @Before public void setup() { - System.setProperty("KYLIN_CONF", "../examples/test_case_data/localmeta"); + System.setProperty("KYLIN_CONF", LocalFileMetadataTestCase.LOCALMETA_TEST_DATA); } @After http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 6bbb0b7..1d3da28 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -19,6 +19,7 @@ package org.apache.kylin.storage.hbase.cube.v2; import java.io.IOException; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; @@ -56,6 +57,7 @@ import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRange; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.storage.hbase.HBaseConnection; @@ -250,20 +252,21 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } @Override - public IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) throws IOException { + public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException { final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior(); + logger.debug("New scanner for current segment {} will use {} as endpoint's behavior", cubeSeg, toggle); short cuboidBaseShard = cubeSeg.getCuboidBaseShard(this.cuboid.getId()); short shardNum = cubeSeg.getCuboidShardNum(this.cuboid.getId()); int totalShards = cubeSeg.getTotalShards(); - final List<ByteString> scanRequestByteStrings = Lists.newArrayList(); - final List<ByteString> rawScanByteStrings = Lists.newArrayList(); + ByteString scanRequestByteString = null; + ByteString rawScanByteString = null; // primary key (also the 0th column block) is always selected - final ImmutableBitSet selectedColBlocks = scanRequests.get(0).getSelectedColBlocks().set(0); + final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0); // globally shared connection, does not require close final HConnection conn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); @@ -274,65 +277,89 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { hbaseColumnsToGTIntList.add(IntList.newBuilder().addAllInts(list).build()); } - for (GTScanRequest req : scanRequests) { - ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); - GTScanRequest.serializer.serialize(req, buffer); - buffer.flip(); - scanRequestByteStrings.add(HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit())); + //TODO: raw scan can be constructed at region side to reduce traffic + List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks); + int rawScanBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE; + while (true) { + try { + ByteBuffer rawScanBuffer = ByteBuffer.allocate(rawScanBufferSize); + BytesUtil.writeVInt(rawScans.size(), rawScanBuffer); + for (RawScan rs : rawScans) { + RawScan.serializer.serialize(rs, rawScanBuffer); + } + rawScanBuffer.flip(); + rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit()); + break; + } catch (BufferOverflowException boe) { + logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", rawScanBufferSize); + rawScanBufferSize *= 4; + } + } + scanRequest.setGTScanRanges(Lists.<GTScanRange> newArrayList());//since raw scans are sent to coprocessor, we don't need to duplicate sending it - RawScan rawScan = preparedHBaseScan(req.getPkStart(), req.getPkEnd(), req.getFuzzyKeys(), selectedColBlocks); + int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE; + while (true) { + try { + ByteBuffer buffer = ByteBuffer.allocate(scanRequestBufferSize); + GTScanRequest.serializer.serialize(scanRequest, buffer); + buffer.flip(); + scanRequestByteString = HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit()); + break; + } catch (BufferOverflowException boe) { + logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", scanRequestBufferSize); + scanRequestBufferSize *= 4; + } + } - ByteBuffer rawScanBuffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); - RawScan.serializer.serialize(rawScan, rawScanBuffer); - rawScanBuffer.flip(); - rawScanByteStrings.add(HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit())); + logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size()); - logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", buffer.limit() - buffer.position(), rawScanBuffer.limit() - rawScanBuffer.position()); - logger.info("The scan {} for segment {} is as below, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(req)), cubeSeg); - logScan(rawScan, cubeSeg.getStorageLocationIdentifier()); + logger.info("The scan {} for segment {} is as below, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg); + for (RawScan rs : rawScans) { + logScan(rs, cubeSeg.getStorageLocationIdentifier()); } - logger.debug("Submitting rpc to {} shards starting from shard {}, scan requests count {}", new Object[] { shardNum, cuboidBaseShard, scanRequests.size() }); + logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", new Object[] { shardNum, cuboidBaseShard, rawScans.size() }); final AtomicInteger totalScannedCount = new AtomicInteger(0); - final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(scanRequests.size() * shardNum); + final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum); + final String currentThreadName = Thread.currentThread().getName(); for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) { + final ByteString finalScanRequestByteString = scanRequestByteString; + final ByteString finalRawScanByteString = rawScanByteString; executorService.submit(new Runnable() { @Override public void run() { - for (int i = 0; i < scanRequests.size(); ++i) { - CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder(); - builder.setGtScanRequest(scanRequestByteStrings.get(i)).setHbaseRawScan(rawScanByteStrings.get(i)); - for (IntList intList : hbaseColumnsToGTIntList) { - builder.addHbaseColumnsToGT(intList); - } - builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize()); - builder.setBehavior(toggle); + CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder(); + builder.setGtScanRequest(finalScanRequestByteString).setHbaseRawScan(finalRawScanByteString); + for (IntList intList : hbaseColumnsToGTIntList) { + builder.addHbaseColumnsToGT(intList); + } + builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize()); + builder.setBehavior(toggle); + + Map<byte[], CubeVisitProtos.CubeVisitResponse> results; + try { + results = getResults(builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond()); + } catch (Throwable throwable) { + throw new RuntimeException("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + "Error when visiting cubes by endpoint", throwable); + } - Map<byte[], CubeVisitProtos.CubeVisitResponse> results; + for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) { + totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount()); + logger.info("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> " + getStatsString(result)); try { - results = getResults(builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond()); - } catch (Throwable throwable) { - throw new RuntimeException("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequests.get(i))) + "> " + "Error when visiting cubes by endpoint", throwable); - } - - for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) { - totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount()); - logger.info("<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequests.get(i))) + "> " + getStatsString(result)); - try { - epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows()))); - } catch (IOException | DataFormatException e) { - throw new RuntimeException("Error when decompressing", e); - } + epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows()))); + } catch (IOException | DataFormatException e) { + throw new RuntimeException("Error when decompressing", e); } } } }); } - return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequests.get(0).getColumns(), totalScannedCount.get()); + return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get()); } private String getStatsString(Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result) { http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 8563a5e..49e8593 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -18,7 +18,6 @@ package org.apache.kylin.storage.hbase.cube.v2; -import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -43,16 +42,15 @@ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; import org.apache.kylin.cube.model.HBaseMappingDesc; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.gridtable.GTScanRequest; -import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.gridtable.GTScanRange; +import org.apache.kylin.gridtable.IGTStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -public abstract class CubeHBaseRPC { +public abstract class CubeHBaseRPC implements IGTStorage { public static final Logger logger = LoggerFactory.getLogger(CubeHBaseRPC.class); @@ -71,8 +69,6 @@ public abstract class CubeHBaseRPC { this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid); } - abstract IGTScanner getGTScanner(final List<GTScanRequest> scanRequests) throws IOException; - public static Scan buildScan(RawScan rawScan) { Scan scan = new Scan(); scan.setCaching(rawScan.hbaseCaching); @@ -96,7 +92,7 @@ public abstract class CubeHBaseRPC { return scan; } - protected RawScan preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) { + private RawScan preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) { final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks); LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid); @@ -123,46 +119,12 @@ public abstract class CubeHBaseRPC { return new RawScan(start, end, selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize); } - protected List<RawScan> preparedHBaseScans(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) { - final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks); - List<RawScan> ret = Lists.newArrayList(); - - LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid); - byte[] start = encoder.createBuf(); - byte[] end = encoder.createBuf(); - List<byte[]> startKeys; - List<byte[]> endKeys; - - encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE); - encoder.encode(pkStart, pkStart.getInfo().getPrimaryKey(), start); - startKeys = encoder.getRowKeysDifferentShards(start); - - encoder.setBlankByte(RowConstants.ROWKEY_UPPER_BYTE); - encoder.encode(pkEnd, pkEnd.getInfo().getPrimaryKey(), end); - endKeys = encoder.getRowKeysDifferentShards(end); - endKeys = Lists.transform(endKeys, new Function<byte[], byte[]>() { - @Override - public byte[] apply(byte[] input) { - byte[] shardEnd = new byte[input.length + 1];//append extra 0 to the end key to make it inclusive while scanning - System.arraycopy(input, 0, shardEnd, 0, input.length); - return shardEnd; - } - }); - - Preconditions.checkState(startKeys.size() == endKeys.size()); - List<Pair<byte[], byte[]>> hbaseFuzzyKeys = translateFuzzyKeys(fuzzyKeys); - - KylinConfig config = cubeSeg.getCubeDesc().getConfig(); - int hbaseCaching = config.getHBaseScanCacheRows(); - int hbaseMaxResultSize = config.getHBaseScanMaxResultSize(); - if (isMemoryHungry(selectedColBlocks)) - hbaseCaching /= 10; - - for (short i = 0; i < startKeys.size(); ++i) { - ret.add(new RawScan(startKeys.get(i), endKeys.get(i), selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize)); + protected List<RawScan> preparedHBaseScans(List<GTScanRange> ranges, ImmutableBitSet selectedColBlocks) { + List<RawScan> allRawScans = Lists.newArrayList(); + for (GTScanRange range : ranges) { + allRawScans.add(preparedHBaseScan(range.pkStart, range.pkEnd, range.fuzzyKeys, selectedColBlocks)); } - return ret; - + return allRawScans; } private boolean isMemoryHungry(ImmutableBitSet selectedColBlocks) {