Repository: kylin Updated Branches: refs/heads/2.0-rc a118a27c6 -> 827a995da
Revert "KYLIN-1356 use exec-maven-plugin for IT environment provision" This reverts commit ab853f9dc987549ac27dd2bb3db1732e3e474281. Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/827a995d Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/827a995d Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/827a995d Branch: refs/heads/2.0-rc Commit: 827a995dab8ed28907d1e16abc5972ae253d712a Parents: db87d46 Author: honma <ho...@ebay.com> Authored: Fri Jan 22 11:18:21 2016 +0800 Committer: honma <ho...@ebay.com> Committed: Fri Jan 22 11:19:33 2016 +0800 ---------------------------------------------------------------------- .../engine/spark/BuildCubeWithSparkTest.java | 3 - .../apache/kylin/job/BuildCubeWithEngine.java | 333 ------------------- .../kylin/job/BuildCubeWithEngineTest.java | 333 +++++++++++++++++++ .../apache/kylin/job/BuildCubeWithStream.java | 110 ------ .../kylin/job/BuildCubeWithStreamTest.java | 114 +++++++ .../apache/kylin/job/BuildIIWithEngineTest.java | 7 +- .../org/apache/kylin/job/BuildIIWithStream.java | 292 ---------------- .../apache/kylin/job/BuildIIWithStreamTest.java | 290 ++++++++++++++++ dev-support/test_all_against_hdp_2_2_4_2_2.sh | 3 + pom.xml | 73 ++-- 10 files changed, 769 insertions(+), 789 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/827a995d/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java b/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java index f87fa99..f5b9741 100644 --- a/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java +++ b/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java @@ -46,13 +46,10 @@ import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import com.google.common.collect.Lists; -//TODO: convert it to a normal class rather than a test case, like in BuildCubeWithEngine -@Ignore public class BuildCubeWithSparkTest { private CubeManager cubeManager; http://git-wip-us.apache.org/repos/asf/kylin/blob/827a995d/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngine.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngine.java deleted file mode 100644 index 2562a8b..0000000 --- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngine.java +++ /dev/null @@ -1,333 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.job; - -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.lang.reflect.Method; -import java.text.SimpleDateFormat; -import java.util.List; -import java.util.TimeZone; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.AbstractKylinTestCase; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.engine.EngineFactory; -import org.apache.kylin.engine.mr.CubingJob; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.impl.threadpool.DefaultScheduler; -import org.apache.kylin.job.manager.ExecutableManager; -import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; -import org.apache.kylin.storage.hbase.util.StorageCleanupJob; -import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; - -import com.google.common.collect.Lists; - -public class BuildCubeWithEngine { - - private CubeManager cubeManager; - private DefaultScheduler scheduler; - protected ExecutableManager jobService; - private static boolean fastBuildMode = false; - - private static final Log logger = LogFactory.getLog(BuildCubeWithEngine.class); - - public static void main(String[] args) throws Exception { - beforeClass(); - BuildCubeWithEngine buildCubeWithEngine = new BuildCubeWithEngine(); - buildCubeWithEngine.before(); - buildCubeWithEngine.build(); - afterClass(); - } - - public static void beforeClass() throws Exception { - logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - - String fastModeStr = System.getProperty("fastBuildMode"); - if (fastModeStr != null && fastModeStr.equalsIgnoreCase("true")) { - fastBuildMode = true; - logger.info("Will use fast build mode"); - } else { - logger.info("Will not use fast build mode"); - } - - System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); - if (System.getProperty("hdp.version") == null) { - throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); - } - - HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); - } - - public void before() throws Exception { - - DeployUtil.initCliWorkDir(); - DeployUtil.deployMetadata(); - DeployUtil.overrideJobJarLocations(); - - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - jobService = ExecutableManager.getInstance(kylinConfig); - scheduler = DefaultScheduler.getInstance(); - scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); - if (!scheduler.hasStarted()) { - throw new RuntimeException("scheduler has not been started"); - } - cubeManager = CubeManager.getInstance(kylinConfig); - for (String jobId : jobService.getAllJobIds()) { - if (jobService.getJob(jobId) instanceof CubingJob) { - jobService.deleteJob(jobId); - } - } - - } - - public static void afterClass() { - HBaseMetadataTestCase.staticCleanupTestMetadata(); - } - - public void build() throws Exception { - DeployUtil.prepareTestDataForNormalCubes("test_kylin_cube_with_slr_left_join_empty"); - testInner(); - testLeft(); - } - - protected void waitForJob(String jobId) { - while (true) { - AbstractExecutable job = jobService.getJob(jobId); - if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) { - break; - } else { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - } - - private void testInner() throws Exception { - String[] testCase = new String[] { "testInnerJoinCubeWithoutSlr", "testInnerJoinCubeWithSlr" }; - runTestAndAssertSucceed(testCase); - } - - private void testLeft() throws Exception { - String[] testCase = new String[] { "testLeftJoinCubeWithSlr", "testLeftJoinCubeWithoutSlr" }; - runTestAndAssertSucceed(testCase); - } - - private void runTestAndAssertSucceed(String[] testCase) throws Exception { - ExecutorService executorService = Executors.newFixedThreadPool(testCase.length); - final CountDownLatch countDownLatch = new CountDownLatch(testCase.length); - List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length); - for (int i = 0; i < testCase.length; i++) { - tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch))); - } - countDownLatch.await(); - try { - for (int i = 0; i < tasks.size(); ++i) { - Future<List<String>> task = tasks.get(i); - final List<String> jobIds = task.get(); - for (String jobId : jobIds) { - assertJobSucceed(jobId); - } - } - } catch (Exception ex) { - logger.error(ex); - throw ex; - } - } - - private void assertJobSucceed(String jobId) { - assertEquals("The job '" + jobId + "' is failed.", ExecutableState.SUCCEED, jobService.getOutput(jobId).getState()); - } - - private class TestCallable implements Callable<List<String>> { - - private final String methodName; - private final CountDownLatch countDownLatch; - - public TestCallable(String methodName, CountDownLatch countDownLatch) { - this.methodName = methodName; - this.countDownLatch = countDownLatch; - } - - @SuppressWarnings("unchecked") - @Override - public List<String> call() throws Exception { - try { - final Method method = BuildCubeWithEngine.class.getDeclaredMethod(methodName); - method.setAccessible(true); - return (List<String>) method.invoke(BuildCubeWithEngine.this); - } catch (Exception e) { - logger.error(e.getMessage()); - throw e; - } finally { - countDownLatch.countDown(); - } - } - } - - @SuppressWarnings("unused") - // called by reflection - private List<String> testInnerJoinCubeWithSlr() throws Exception { - clearSegment("test_kylin_cube_with_slr_empty"); - SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); - f.setTimeZone(TimeZone.getTimeZone("GMT")); - long date1 = 0; - long date2 = f.parse("2015-01-01").getTime(); - long date3 = f.parse("2022-01-01").getTime(); - List<String> result = Lists.newArrayList(); - - if (fastBuildMode) { - result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date3)); - } else { - result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date2)); - result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, date3));//empty segment - } - return result; - } - - @SuppressWarnings("unused") - // called by reflection - private List<String> testInnerJoinCubeWithoutSlr() throws Exception { - - clearSegment("test_kylin_cube_without_slr_empty"); - SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); - f.setTimeZone(TimeZone.getTimeZone("GMT")); - long date1 = 0; - long date2 = f.parse("2013-01-01").getTime(); - long date3 = f.parse("2013-07-01").getTime(); - long date4 = f.parse("2022-01-01").getTime(); - List<String> result = Lists.newArrayList(); - - if (fastBuildMode) { - result.add(buildSegment("test_kylin_cube_without_slr_empty", date1, date4)); - } else { - result.add(buildSegment("test_kylin_cube_without_slr_empty", date1, date2)); - result.add(buildSegment("test_kylin_cube_without_slr_empty", date2, date3)); - result.add(buildSegment("test_kylin_cube_without_slr_empty", date3, date4)); - result.add(mergeSegment("test_kylin_cube_without_slr_empty", date1, date3));//don't merge all segments - } - return result; - - } - - @SuppressWarnings("unused") - // called by reflection - private List<String> testLeftJoinCubeWithoutSlr() throws Exception { - SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); - f.setTimeZone(TimeZone.getTimeZone("GMT")); - List<String> result = Lists.newArrayList(); - final String cubeName = "test_kylin_cube_without_slr_left_join_empty"; - clearSegment(cubeName); - - long date1 = cubeManager.getCube(cubeName).getDescriptor().getPartitionDateStart(); - long date2 = f.parse("2012-06-01").getTime(); - long date3 = f.parse("2022-01-01").getTime(); - long date4 = f.parse("2023-01-01").getTime(); - - if (fastBuildMode) { - result.add(buildSegment("test_kylin_cube_without_slr_left_join_empty", date1, date4)); - } else { - result.add(buildSegment("test_kylin_cube_without_slr_left_join_empty", date1, date2)); - result.add(buildSegment("test_kylin_cube_without_slr_left_join_empty", date2, date3)); - result.add(buildSegment("test_kylin_cube_without_slr_left_join_empty", date3, date4));//empty segment - result.add(mergeSegment("test_kylin_cube_without_slr_left_join_empty", date1, date3));//don't merge all segments - } - - return result; - - } - - @SuppressWarnings("unused") - // called by reflection - private List<String> testLeftJoinCubeWithSlr() throws Exception { - String cubeName = "test_kylin_cube_with_slr_left_join_empty"; - clearSegment(cubeName); - - SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); - f.setTimeZone(TimeZone.getTimeZone("GMT")); - long date1 = cubeManager.getCube(cubeName).getDescriptor().getPartitionDateStart(); - long date2 = f.parse("2013-01-01").getTime(); - long date3 = f.parse("2013-07-01").getTime(); - long date4 = f.parse("2022-01-01").getTime(); - - List<String> result = Lists.newArrayList(); - if (fastBuildMode) { - result.add(buildSegment(cubeName, date1, date4)); - } else { - result.add(buildSegment(cubeName, date1, date2)); - result.add(buildSegment(cubeName, date2, date3)); - result.add(buildSegment(cubeName, date3, date4)); - result.add(mergeSegment(cubeName, date1, date3));//don't merge all segments - } - return result; - - } - - private void clearSegment(String cubeName) throws Exception { - CubeInstance cube = cubeManager.getCube(cubeName); - // remove all existing segments - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); - cubeManager.updateCube(cubeBuilder); - } - - private String mergeSegment(String cubeName, long startDate, long endDate) throws Exception { - CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), startDate, endDate, true); - DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST"); - jobService.addJob(job); - waitForJob(job.getId()); - return job.getId(); - } - - private String buildSegment(String cubeName, long startDate, long endDate) throws Exception { - CubeSegment segment = cubeManager.appendSegments(cubeManager.getCube(cubeName), endDate); - DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); - jobService.addJob(job); - waitForJob(job.getId()); - return job.getId(); - } - - private int cleanupOldStorage() throws Exception { - String[] args = { "--delete", "true" }; - - int exitCode = ToolRunner.run(new StorageCleanupJob(), args); - return exitCode; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/827a995d/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java new file mode 100644 index 0000000..331e21d --- /dev/null +++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.lang.reflect.Method; +import java.text.SimpleDateFormat; +import java.util.List; +import java.util.TimeZone; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractKylinTestCase; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.impl.threadpool.DefaultScheduler; +import org.apache.kylin.job.manager.ExecutableManager; +import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; +import org.apache.kylin.storage.hbase.util.StorageCleanupJob; +import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class BuildCubeWithEngineTest { + + private CubeManager cubeManager; + private DefaultScheduler scheduler; + protected ExecutableManager jobService; + private static boolean fastBuildMode = false; + + private static final Log logger = LogFactory.getLog(BuildCubeWithEngineTest.class); + + protected void waitForJob(String jobId) { + while (true) { + AbstractExecutable job = jobService.getJob(jobId); + if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) { + break; + } else { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + @BeforeClass + public static void beforeClass() throws Exception { + logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + + String fastModeStr = System.getProperty("fastBuildMode"); + if (fastModeStr != null && fastModeStr.equalsIgnoreCase("true")) { + fastBuildMode = true; + logger.info("Will use fast build mode"); + } else { + logger.info("Will not use fast build mode"); + } + + System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); + if (System.getProperty("hdp.version") == null) { + throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); + } + + } + + @Before + public void before() throws Exception { + HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + + DeployUtil.initCliWorkDir(); + DeployUtil.deployMetadata(); + DeployUtil.overrideJobJarLocations(); + + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + jobService = ExecutableManager.getInstance(kylinConfig); + scheduler = DefaultScheduler.getInstance(); + scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); + if (!scheduler.hasStarted()) { + throw new RuntimeException("scheduler has not been started"); + } + cubeManager = CubeManager.getInstance(kylinConfig); + for (String jobId : jobService.getAllJobIds()) { + if (jobService.getJob(jobId) instanceof CubingJob) { + jobService.deleteJob(jobId); + } + } + + } + + @After + public void after() { + HBaseMetadataTestCase.staticCleanupTestMetadata(); + } + + @Test + public void test() throws Exception { + DeployUtil.prepareTestDataForNormalCubes("test_kylin_cube_with_slr_left_join_empty"); + testInner(); + testLeft(); + } + + private void testInner() throws Exception { + String[] testCase = new String[] { "testInnerJoinCubeWithoutSlr", "testInnerJoinCubeWithSlr" }; + runTestAndAssertSucceed(testCase); + } + + private void testLeft() throws Exception { + String[] testCase = new String[] { "testLeftJoinCubeWithSlr", "testLeftJoinCubeWithoutSlr" }; + runTestAndAssertSucceed(testCase); + } + + private void runTestAndAssertSucceed(String[] testCase) throws Exception { + ExecutorService executorService = Executors.newFixedThreadPool(testCase.length); + final CountDownLatch countDownLatch = new CountDownLatch(testCase.length); + List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length); + for (int i = 0; i < testCase.length; i++) { + tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch))); + } + countDownLatch.await(); + try { + for (int i = 0; i < tasks.size(); ++i) { + Future<List<String>> task = tasks.get(i); + final List<String> jobIds = task.get(); + for (String jobId : jobIds) { + assertJobSucceed(jobId); + } + } + } catch (Exception ex) { + logger.error(ex); + throw ex; + } + } + + private void assertJobSucceed(String jobId) { + assertEquals("The job '" + jobId + "' is failed.", ExecutableState.SUCCEED, jobService.getOutput(jobId).getState()); + } + + private class TestCallable implements Callable<List<String>> { + + private final String methodName; + private final CountDownLatch countDownLatch; + + public TestCallable(String methodName, CountDownLatch countDownLatch) { + this.methodName = methodName; + this.countDownLatch = countDownLatch; + } + + @SuppressWarnings("unchecked") + @Override + public List<String> call() throws Exception { + try { + final Method method = BuildCubeWithEngineTest.class.getDeclaredMethod(methodName); + method.setAccessible(true); + return (List<String>) method.invoke(BuildCubeWithEngineTest.this); + } catch (Exception e) { + logger.error(e.getMessage()); + throw e; + } finally { + countDownLatch.countDown(); + } + } + } + + @SuppressWarnings("unused") + // called by reflection + private List<String> testInnerJoinCubeWithSlr() throws Exception { + clearSegment("test_kylin_cube_with_slr_empty"); + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); + f.setTimeZone(TimeZone.getTimeZone("GMT")); + long date1 = 0; + long date2 = f.parse("2015-01-01").getTime(); + long date3 = f.parse("2022-01-01").getTime(); + List<String> result = Lists.newArrayList(); + + if (fastBuildMode) { + result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date3)); + } else { + result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date2)); + result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, date3));//empty segment + } + return result; + } + + @SuppressWarnings("unused") + // called by reflection + private List<String> testInnerJoinCubeWithoutSlr() throws Exception { + + clearSegment("test_kylin_cube_without_slr_empty"); + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); + f.setTimeZone(TimeZone.getTimeZone("GMT")); + long date1 = 0; + long date2 = f.parse("2013-01-01").getTime(); + long date3 = f.parse("2013-07-01").getTime(); + long date4 = f.parse("2022-01-01").getTime(); + List<String> result = Lists.newArrayList(); + + if (fastBuildMode) { + result.add(buildSegment("test_kylin_cube_without_slr_empty", date1, date4)); + } else { + result.add(buildSegment("test_kylin_cube_without_slr_empty", date1, date2)); + result.add(buildSegment("test_kylin_cube_without_slr_empty", date2, date3)); + result.add(buildSegment("test_kylin_cube_without_slr_empty", date3, date4)); + result.add(mergeSegment("test_kylin_cube_without_slr_empty", date1, date3));//don't merge all segments + } + return result; + + } + + @SuppressWarnings("unused") + // called by reflection + private List<String> testLeftJoinCubeWithoutSlr() throws Exception { + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); + f.setTimeZone(TimeZone.getTimeZone("GMT")); + List<String> result = Lists.newArrayList(); + final String cubeName = "test_kylin_cube_without_slr_left_join_empty"; + clearSegment(cubeName); + + long date1 = cubeManager.getCube(cubeName).getDescriptor().getPartitionDateStart(); + long date2 = f.parse("2012-06-01").getTime(); + long date3 = f.parse("2022-01-01").getTime(); + long date4 = f.parse("2023-01-01").getTime(); + + if (fastBuildMode) { + result.add(buildSegment("test_kylin_cube_without_slr_left_join_empty", date1, date4)); + } else { + result.add(buildSegment("test_kylin_cube_without_slr_left_join_empty", date1, date2)); + result.add(buildSegment("test_kylin_cube_without_slr_left_join_empty", date2, date3)); + result.add(buildSegment("test_kylin_cube_without_slr_left_join_empty", date3, date4));//empty segment + result.add(mergeSegment("test_kylin_cube_without_slr_left_join_empty", date1, date3));//don't merge all segments + } + + return result; + + } + + @SuppressWarnings("unused") + // called by reflection + private List<String> testLeftJoinCubeWithSlr() throws Exception { + String cubeName = "test_kylin_cube_with_slr_left_join_empty"; + clearSegment(cubeName); + + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); + f.setTimeZone(TimeZone.getTimeZone("GMT")); + long date1 = cubeManager.getCube(cubeName).getDescriptor().getPartitionDateStart(); + long date2 = f.parse("2013-01-01").getTime(); + long date3 = f.parse("2013-07-01").getTime(); + long date4 = f.parse("2022-01-01").getTime(); + + List<String> result = Lists.newArrayList(); + if (fastBuildMode) { + result.add(buildSegment(cubeName, date1, date4)); + } else { + result.add(buildSegment(cubeName, date1, date2)); + result.add(buildSegment(cubeName, date2, date3)); + result.add(buildSegment(cubeName, date3, date4)); + result.add(mergeSegment(cubeName, date1, date3));//don't merge all segments + } + return result; + + } + + private void clearSegment(String cubeName) throws Exception { + CubeInstance cube = cubeManager.getCube(cubeName); + // remove all existing segments + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); + cubeManager.updateCube(cubeBuilder); + } + + private String mergeSegment(String cubeName, long startDate, long endDate) throws Exception { + CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), startDate, endDate, true); + DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST"); + jobService.addJob(job); + waitForJob(job.getId()); + return job.getId(); + } + + private String buildSegment(String cubeName, long startDate, long endDate) throws Exception { + CubeSegment segment = cubeManager.appendSegments(cubeManager.getCube(cubeName), endDate); + DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); + jobService.addJob(job); + waitForJob(job.getId()); + return job.getId(); + } + + private int cleanupOldStorage() throws Exception { + String[] args = { "--delete", "true" }; + + int exitCode = ToolRunner.run(new StorageCleanupJob(), args); + return exitCode; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/827a995d/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStream.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStream.java deleted file mode 100644 index d1f0a48..0000000 --- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStream.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * - * contributor license agreements. See the NOTICE file distributed with - * - * this work for additional information regarding copyright ownership. - * - * The ASF licenses this file to You under the Apache License, Version 2.0 - * - * (the "License"); you may not use this file except in compliance with - * - * the License. You may obtain a copy of the License at - * - * - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * - * - * Unless required by applicable law or agreed to in writing, software - * - * distributed under the License is distributed on an "AS IS" BASIS, - * - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * - * See the License for the specific language governing permissions and - * - * limitations under the License. - * - * / - */ - -package org.apache.kylin.job; - -import java.io.File; -import java.util.UUID; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.AbstractKylinTestCase; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.engine.streaming.OneOffStreamingBuilder; -import org.apache.kylin.engine.streaming.StreamingConfig; -import org.apache.kylin.engine.streaming.StreamingManager; -import org.apache.kylin.source.kafka.KafkaConfigManager; -import org.apache.kylin.source.kafka.config.KafkaConfig; -import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * for streaming cubing case "test_streaming_table" - */ -public class BuildCubeWithStream { - - private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream.class); - private static final String streamingName = "test_streaming_table_cube"; - private static final long startTime = DateFormat.stringToMillis("2015-01-01 00:00:00"); - private static final long endTime = DateFormat.stringToMillis("2015-01-03 00:00:00"); - private static final long batchInterval = 16 * 60 * 60 * 1000;//16 hours - - private KylinConfig kylinConfig; - - public static void main(String[] args) throws Exception { - beforeClass(); - BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream(); - buildCubeWithStream.before(); - buildCubeWithStream.build(); - afterClass(); - } - - public static void beforeClass() throws Exception { - logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); - if (System.getProperty("hdp.version") == null) { - throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); - } - HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); - } - - public void before() throws Exception { - DeployUtil.overrideJobJarLocations(); - - kylinConfig = KylinConfig.getInstanceFromEnv(); - - final StreamingConfig config = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingName); - - //Use a random topic for kafka data stream - KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingName); - streamingConfig.setTopic(UUID.randomUUID().toString()); - KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig); - - DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, config.getCubeName(), streamingConfig); - } - - public static void afterClass() throws Exception { - HBaseMetadataTestCase.staticCleanupTestMetadata(); - } - - public void build() throws Exception { - logger.info("start time:" + startTime + " end time:" + endTime + " batch interval:" + batchInterval + " batch count:" + ((endTime - startTime) / batchInterval)); - for (long start = startTime; start < endTime; start += batchInterval) { - logger.info(String.format("build batch:{%d, %d}", start, start + batchInterval)); - new OneOffStreamingBuilder(streamingName, start, start + batchInterval).build().run(); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/827a995d/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java new file mode 100644 index 0000000..9e60622 --- /dev/null +++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java @@ -0,0 +1,114 @@ +/* + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * + * contributor license agreements. See the NOTICE file distributed with + * + * this work for additional information regarding copyright ownership. + * + * The ASF licenses this file to You under the Apache License, Version 2.0 + * + * (the "License"); you may not use this file except in compliance with + * + * the License. You may obtain a copy of the License at + * + * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and + * + * limitations under the License. + * + * / + */ + +package org.apache.kylin.job; + +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractKylinTestCase; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.engine.streaming.OneOffStreamingBuilder; +import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.source.kafka.KafkaConfigManager; +import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; +import org.apache.kylin.storage.hbase.util.StorageCleanupJob; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +/** + * for streaming cubing case "test_streaming_table" + */ +public class BuildCubeWithStreamTest { + + private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStreamTest.class); + private static final String streamingName = "test_streaming_table_cube"; + private static final long startTime = DateFormat.stringToMillis("2015-01-01 00:00:00"); + private static final long endTime = DateFormat.stringToMillis("2015-01-03 00:00:00"); + private static final long batchInterval = 16 * 60 * 60 * 1000;//16 hours + + private KylinConfig kylinConfig; + + @BeforeClass + public static void beforeClass() throws Exception { + logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); + if (System.getProperty("hdp.version") == null) { + throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); + } + + } + + @Before + public void before() throws Exception { + HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + DeployUtil.overrideJobJarLocations(); + + kylinConfig = KylinConfig.getInstanceFromEnv(); + + final StreamingConfig config = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingName); + + //Use a random topic for kafka data stream + KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingName); + streamingConfig.setTopic(UUID.randomUUID().toString()); + KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig); + + DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, config.getCubeName(), streamingConfig); + } + + @AfterClass + public static void afterClass() throws Exception { + HBaseMetadataTestCase.staticCleanupTestMetadata(); + } + + @Test + public void test() throws Exception { + logger.info("start time:" + startTime + " end time:" + endTime + " batch interval:" + batchInterval + " batch count:" + ((endTime - startTime) / batchInterval)); + for (long start = startTime; start < endTime; start += batchInterval) { + logger.info(String.format("build batch:{%d, %d}", start, start + batchInterval)); + new OneOffStreamingBuilder(streamingName, start, start + batchInterval).build().run(); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/827a995d/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java index 31dc396..0158fad 100644 --- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java +++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java @@ -38,7 +38,6 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.AbstractKylinTestCase; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.engine.mr.invertedindex.BatchIIJobBuilder; -import org.apache.kylin.engine.mr.invertedindex.IIJob; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.IISegment; @@ -46,6 +45,7 @@ import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.impl.threadpool.DefaultScheduler; +import org.apache.kylin.engine.mr.invertedindex.IIJob; import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; @@ -59,8 +59,9 @@ import org.junit.Test; import com.google.common.collect.Lists; -//TODO: convert it to a normal class rather than a test case, like in BuildCubeWithEngine -@Ignore +/** + * @author shaoshi + */ public class BuildIIWithEngineTest { private JobEngineConfig jobEngineConfig; http://git-wip-us.apache.org/repos/asf/kylin/blob/827a995d/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStream.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStream.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStream.java deleted file mode 100644 index 1e38d3a..0000000 --- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStream.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * - * contributor license agreements. See the NOTICE file distributed with - * - * this work for additional information regarding copyright ownership. - * - * The ASF licenses this file to You under the Apache License, Version 2.0 - * - * (the "License"); you may not use this file except in compliance with - * - * the License. You may obtain a copy of the License at - * - * - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * - * - * Unless required by applicable law or agreed to in writing, software - * - * distributed under the License is distributed on an "AS IS" BASIS, - * - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * - * See the License for the specific language governing permissions and - * - * limitations under the License. - * - * / - */ - -package org.apache.kylin.job; - -import java.io.File; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.TimeZone; -import java.util.UUID; - -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.AbstractKylinTestCase; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.common.util.StreamingBatch; -import org.apache.kylin.common.util.StreamingMessage; -import org.apache.kylin.engine.mr.JobBuilderSupport; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.invertedindex.index.Slice; -import org.apache.kylin.invertedindex.index.SliceBuilder; -import org.apache.kylin.invertedindex.model.IIDesc; -import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc; -import org.apache.kylin.invertedindex.model.IIKeyValueCodec; -import org.apache.kylin.invertedindex.model.IIRow; -import org.apache.kylin.job.common.ShellExecutable; -import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.realization.RealizationStatusEnum; -import org.apache.kylin.source.hive.HiveTableReader; -import org.apache.kylin.storage.hbase.HBaseConnection; -import org.apache.kylin.storage.hbase.ii.IICreateHTableJob; -import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; -import org.apache.kylin.storage.hbase.util.StorageCleanupJob; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -public class BuildIIWithStream { - - private static final Logger logger = LoggerFactory.getLogger(BuildIIWithStream.class); - - private static final String[] II_NAME = new String[] { "test_kylin_ii_left_join", "test_kylin_ii_inner_join" }; - private IIManager iiManager; - private KylinConfig kylinConfig; - - public static void main(String[] args) throws Exception { - beforeClass(); - BuildIIWithStream buildCubeWithEngine = new BuildIIWithStream(); - buildCubeWithEngine.before(); - buildCubeWithEngine.build(); - afterClass(); - } - - public static void beforeClass() throws Exception { - logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - if (System.getProperty("hdp.version") == null) { - throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); - } - HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); - } - - public void before() throws Exception { - DeployUtil.overrideJobJarLocations(); - - kylinConfig = KylinConfig.getInstanceFromEnv(); - iiManager = IIManager.getInstance(kylinConfig); - for (String iiInstance : II_NAME) { - - IIInstance ii = iiManager.getII(iiInstance); - if (ii.getStatus() != RealizationStatusEnum.DISABLED) { - ii.setStatus(RealizationStatusEnum.DISABLED); - iiManager.updateII(ii); - } - } - } - - public static void afterClass() throws Exception { - cleanupOldStorage(); - HBaseMetadataTestCase.staticCleanupTestMetadata(); - } - - private String createIntermediateTable(IIDesc desc, KylinConfig kylinConfig) throws IOException { - IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(desc); - JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig); - final String uuid = UUID.randomUUID().toString(); - final String useDatabaseHql = "USE " + kylinConfig.getHiveDatabaseForIntermediateTable() + ";"; - final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc); - final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, JobBuilderSupport.getJobWorkingDir(jobEngineConfig, uuid)); - String insertDataHqls; - try { - insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobEngineConfig); - } catch (IOException e1) { - e1.printStackTrace(); - throw new RuntimeException("Failed to generate insert data SQL for intermediate table."); - } - - ShellExecutable step = new ShellExecutable(); - StringBuffer buf = new StringBuffer(); - buf.append("hive -e \""); - buf.append(useDatabaseHql + "\n"); - buf.append(dropTableHql + "\n"); - buf.append(createTableHql + "\n"); - buf.append(insertDataHqls + "\n"); - buf.append("\""); - - step.setCmd(buf.toString()); - logger.info(step.getCmd()); - step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); - kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null); - return intermediateTableDesc.getTableName(); - } - - private void clearSegment(String iiName) throws Exception { - IIInstance ii = iiManager.getII(iiName); - ii.getSegments().clear(); - iiManager.updateII(ii); - } - - private IISegment createSegment(String iiName) throws Exception { - clearSegment(iiName); - SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); - f.setTimeZone(TimeZone.getTimeZone("GMT")); - - long date1 = 0; - long date2 = f.parse("2015-01-01").getTime(); - return buildSegment(iiName, date1, date2); - } - - private IISegment buildSegment(String iiName, long startDate, long endDate) throws Exception { - IIInstance iiInstance = iiManager.getII(iiName); - IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate); - iiInstance.getSegments().add(segment); - iiManager.updateII(iiInstance); - return segment; - } - - private void buildII(String iiName) throws Exception { - final IIDesc desc = iiManager.getII(iiName).getDescriptor(); - final String tableName = createIntermediateTable(desc, kylinConfig); - logger.info("intermediate table name:" + tableName); - - HiveTableReader reader = new HiveTableReader("default", tableName); - final List<TblColRef> tblColRefs = desc.listAllColumns(); - for (TblColRef tblColRef : tblColRefs) { - if (desc.isMetricsCol(tblColRef)) { - logger.info("matrix:" + tblColRef.getName()); - } else { - logger.info("measure:" + tblColRef.getName()); - } - } - final IISegment segment = createSegment(iiName); - final HTableInterface htable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(segment.getStorageLocationIdentifier()); - String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() }; - ToolRunner.run(new IICreateHTableJob(), args); - - final IIDesc iiDesc = segment.getIIDesc(); - final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0); - - List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn()); - int count = sorted.size(); - ArrayList<StreamingMessage> messages = Lists.newArrayList(); - for (String[] row : sorted) { - messages.add((parse(row))); - if (messages.size() >= iiDesc.getSliceSize()) { - build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable); - messages.clear(); - } - } - - if (!messages.isEmpty()) { - build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable); - } - - reader.close(); - logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier()); - logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier()); - } - - public void build() throws Exception { - for (String iiName : II_NAME) { - buildII(iiName); - IIInstance ii = iiManager.getII(iiName); - if (ii.getStatus() != RealizationStatusEnum.READY) { - ii.setStatus(RealizationStatusEnum.READY); - iiManager.updateII(ii); - } - } - } - - private void build(SliceBuilder sliceBuilder, StreamingBatch batch, HTableInterface htable) throws IOException { - final Slice slice = sliceBuilder.buildSlice(batch); - try { - loadToHBase(htable, slice, new IIKeyValueCodec(slice.getInfo())); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException { - List<Put> data = Lists.newArrayList(); - for (IIRow row : codec.encodeKeyValue(slice)) { - final byte[] key = row.getKey().get(); - final byte[] value = row.getValue().get(); - Put put = new Put(key); - put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value); - final ImmutableBytesWritable dictionary = row.getDictionary(); - final byte[] dictBytes = dictionary.get(); - if (dictionary.getOffset() == 0 && dictionary.getLength() == dictBytes.length) { - put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes); - } else { - throw new RuntimeException("dict offset should be 0, and dict length should be " + dictBytes.length + " but they are" + dictionary.getOffset() + " " + dictionary.getLength()); - } - data.add(put); - } - hTable.put(data); - //omit hTable.flushCommits(), because htable is auto flush - } - - private StreamingMessage parse(String[] row) { - return new StreamingMessage(Lists.newArrayList(row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, Object> emptyMap()); - } - - private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException { - List<String[]> unsorted = Lists.newArrayList(); - while (reader.next()) { - unsorted.add(reader.getRow()); - } - Collections.sort(unsorted, new Comparator<String[]>() { - @Override - public int compare(String[] o1, String[] o2) { - long t1 = DateFormat.stringToMillis(o1[tsCol]); - long t2 = DateFormat.stringToMillis(o2[tsCol]); - return Long.compare(t1, t2); - } - }); - return unsorted; - } - - private static int cleanupOldStorage() throws Exception { - String[] args = { "--delete", "true" }; - - int exitCode = ToolRunner.run(new StorageCleanupJob(), args); - return exitCode; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/827a995d/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java new file mode 100644 index 0000000..2317e62 --- /dev/null +++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java @@ -0,0 +1,290 @@ +/* + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * + * contributor license agreements. See the NOTICE file distributed with + * + * this work for additional information regarding copyright ownership. + * + * The ASF licenses this file to You under the Apache License, Version 2.0 + * + * (the "License"); you may not use this file except in compliance with + * + * the License. You may obtain a copy of the License at + * + * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and + * + * limitations under the License. + * + * / + */ + +package org.apache.kylin.job; + +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.*; + +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractKylinTestCase; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.common.util.StreamingBatch; +import org.apache.kylin.common.util.StreamingMessage; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.IISegment; +import org.apache.kylin.invertedindex.index.Slice; +import org.apache.kylin.invertedindex.model.IIDesc; +import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc; +import org.apache.kylin.invertedindex.model.IIKeyValueCodec; +import org.apache.kylin.invertedindex.model.IIRow; +import org.apache.kylin.invertedindex.index.SliceBuilder; +import org.apache.kylin.job.common.ShellExecutable; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.storage.hbase.ii.IICreateHTableJob; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.RealizationStatusEnum; +import org.apache.kylin.source.hive.HiveTableReader; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; +import org.apache.kylin.storage.hbase.util.StorageCleanupJob; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +/** + */ +public class BuildIIWithStreamTest { + + private static final Logger logger = LoggerFactory.getLogger(BuildIIWithStreamTest.class); + + private static final String[] II_NAME = new String[] { "test_kylin_ii_left_join", "test_kylin_ii_inner_join" }; + private IIManager iiManager; + private KylinConfig kylinConfig; + + @BeforeClass + public static void beforeClass() throws Exception { + logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + if (System.getProperty("hdp.version") == null) { + throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); + } + } + + @Before + public void before() throws Exception { + HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + DeployUtil.overrideJobJarLocations(); + + kylinConfig = KylinConfig.getInstanceFromEnv(); + iiManager = IIManager.getInstance(kylinConfig); + for (String iiInstance : II_NAME) { + + IIInstance ii = iiManager.getII(iiInstance); + if (ii.getStatus() != RealizationStatusEnum.DISABLED) { + ii.setStatus(RealizationStatusEnum.DISABLED); + iiManager.updateII(ii); + } + } + } + + @AfterClass + public static void cleanup() throws Exception { + cleanupOldStorage(); + } + + private String createIntermediateTable(IIDesc desc, KylinConfig kylinConfig) throws IOException { + IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(desc); + JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig); + final String uuid = UUID.randomUUID().toString(); + final String useDatabaseHql = "USE " + kylinConfig.getHiveDatabaseForIntermediateTable() + ";"; + final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc); + final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, JobBuilderSupport.getJobWorkingDir(jobEngineConfig, uuid)); + String insertDataHqls; + try { + insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobEngineConfig); + } catch (IOException e1) { + e1.printStackTrace(); + throw new RuntimeException("Failed to generate insert data SQL for intermediate table."); + } + + ShellExecutable step = new ShellExecutable(); + StringBuffer buf = new StringBuffer(); + buf.append("hive -e \""); + buf.append(useDatabaseHql + "\n"); + buf.append(dropTableHql + "\n"); + buf.append(createTableHql + "\n"); + buf.append(insertDataHqls + "\n"); + buf.append("\""); + + step.setCmd(buf.toString()); + logger.info(step.getCmd()); + step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); + kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null); + return intermediateTableDesc.getTableName(); + } + + private void clearSegment(String iiName) throws Exception { + IIInstance ii = iiManager.getII(iiName); + ii.getSegments().clear(); + iiManager.updateII(ii); + } + + private IISegment createSegment(String iiName) throws Exception { + clearSegment(iiName); + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); + f.setTimeZone(TimeZone.getTimeZone("GMT")); + + long date1 = 0; + long date2 = f.parse("2015-01-01").getTime(); + return buildSegment(iiName, date1, date2); + } + + private IISegment buildSegment(String iiName, long startDate, long endDate) throws Exception { + IIInstance iiInstance = iiManager.getII(iiName); + IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate); + iiInstance.getSegments().add(segment); + iiManager.updateII(iiInstance); + return segment; + } + + private void buildII(String iiName) throws Exception { + final IIDesc desc = iiManager.getII(iiName).getDescriptor(); + final String tableName = createIntermediateTable(desc, kylinConfig); + logger.info("intermediate table name:" + tableName); + + HiveTableReader reader = new HiveTableReader("default", tableName); + final List<TblColRef> tblColRefs = desc.listAllColumns(); + for (TblColRef tblColRef : tblColRefs) { + if (desc.isMetricsCol(tblColRef)) { + logger.info("matrix:" + tblColRef.getName()); + } else { + logger.info("measure:" + tblColRef.getName()); + } + } + final IISegment segment = createSegment(iiName); + final HTableInterface htable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(segment.getStorageLocationIdentifier()); + String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() }; + ToolRunner.run(new IICreateHTableJob(), args); + + final IIDesc iiDesc = segment.getIIDesc(); + final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0); + + List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn()); + int count = sorted.size(); + ArrayList<StreamingMessage> messages = Lists.newArrayList(); + for (String[] row : sorted) { + messages.add((parse(row))); + if (messages.size() >= iiDesc.getSliceSize()) { + build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable); + messages.clear(); + } + } + + if (!messages.isEmpty()) { + build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable); + } + + reader.close(); + logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier()); + logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier()); + } + + @Test + public void test() throws Exception { + for (String iiName : II_NAME) { + buildII(iiName); + IIInstance ii = iiManager.getII(iiName); + if (ii.getStatus() != RealizationStatusEnum.READY) { + ii.setStatus(RealizationStatusEnum.READY); + iiManager.updateII(ii); + } + } + } + + private void build(SliceBuilder sliceBuilder, StreamingBatch batch, HTableInterface htable) throws IOException { + final Slice slice = sliceBuilder.buildSlice(batch); + try { + loadToHBase(htable, slice, new IIKeyValueCodec(slice.getInfo())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException { + List<Put> data = Lists.newArrayList(); + for (IIRow row : codec.encodeKeyValue(slice)) { + final byte[] key = row.getKey().get(); + final byte[] value = row.getValue().get(); + Put put = new Put(key); + put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value); + final ImmutableBytesWritable dictionary = row.getDictionary(); + final byte[] dictBytes = dictionary.get(); + if (dictionary.getOffset() == 0 && dictionary.getLength() == dictBytes.length) { + put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes); + } else { + throw new RuntimeException("dict offset should be 0, and dict length should be " + dictBytes.length + " but they are" + dictionary.getOffset() + " " + dictionary.getLength()); + } + data.add(put); + } + hTable.put(data); + //omit hTable.flushCommits(), because htable is auto flush + } + + private StreamingMessage parse(String[] row) { + return new StreamingMessage(Lists.newArrayList(row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, Object>emptyMap()); + } + + private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException { + List<String[]> unsorted = Lists.newArrayList(); + while (reader.next()) { + unsorted.add(reader.getRow()); + } + Collections.sort(unsorted, new Comparator<String[]>() { + @Override + public int compare(String[] o1, String[] o2) { + long t1 = DateFormat.stringToMillis(o1[tsCol]); + long t2 = DateFormat.stringToMillis(o2[tsCol]); + return Long.compare(t1, t2); + } + }); + return unsorted; + } + + private static int cleanupOldStorage() throws Exception { + String[] args = { "--delete", "true" }; + + int exitCode = ToolRunner.run(new StorageCleanupJob(), args); + return exitCode; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/827a995d/dev-support/test_all_against_hdp_2_2_4_2_2.sh ---------------------------------------------------------------------- diff --git a/dev-support/test_all_against_hdp_2_2_4_2_2.sh b/dev-support/test_all_against_hdp_2_2_4_2_2.sh index b2f5ce0..e07facb 100644 --- a/dev-support/test_all_against_hdp_2_2_4_2_2.sh +++ b/dev-support/test_all_against_hdp_2_2_4_2_2.sh @@ -22,4 +22,7 @@ cd ${dir} cd .. mvn clean install -DskipTests | tee mci.log +mvn test -Dhdp.version=2.2.4.2-2 -Dtest=org.apache.kylin.job.BuildCubeWithEngineTest -DfailIfNoTests=false -P sandbox | tee BuildCubeWithEngineTest.log +mvn test -Dhdp.version=2.2.4.2-2 -Dtest=org.apache.kylin.job.BuildIIWithStreamTest -DfailIfNoTests=false -P sandbox | tee BuildIIWithStreamTest.log +mvn test -Dhdp.version=2.2.4.2-2 -Dtest=org.apache.kylin.job.BuildCubeWithStreamTest -DfailIfNoTests=false -P sandbox | tee BuildCubeWithStreamTest.log mvn verify -Dhdp.version=2.2.4.2-2 -fae -P sandbox | tee mvnverify.log \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/827a995d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 747e7ee..ccd5a92 100644 --- a/pom.xml +++ b/pom.xml @@ -457,11 +457,11 @@ <artifactId>curator-recipes</artifactId> <version>${curator.version}</version> </dependency> - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - <version>${apache-httpclient.version}</version> - </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>${apache-httpclient.version}</version> + </dependency> </dependencies> </dependencyManagement> @@ -647,6 +647,10 @@ <reportsDirectory>${project.basedir}/../target/surefire-reports</reportsDirectory> <excludes> <exclude>**/IT*.java</exclude> + + <!--Build cube/II need to be run separately--> + <exclude>**/BuildCube*.java</exclude> + <exclude>**/BuildII*.java</exclude> </excludes> <systemProperties> <property> @@ -674,6 +678,7 @@ <excludes> <exclude>**/*$*</exclude> <exclude>**/ITKafka*.java</exclude> + </excludes> <systemProperties> <property> @@ -683,45 +688,6 @@ </systemProperties> </configuration> </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>exec-maven-plugin</artifactId> - <executions> - <execution> - <id>build_cube_with_engine</id> - <goals> - <goal>java</goal> - </goals> - <phase>pre-integration-test</phase> - <configuration> - <classpathScope>test</classpathScope> - <mainClass>org.apache.kylin.job.BuildCubeWithEngine</mainClass> - </configuration> - </execution> - <execution> - <id>build_cube_with_stream</id> - <goals> - <goal>java</goal> - </goals> - <phase>pre-integration-test</phase> - <configuration> - <classpathScope>test</classpathScope> - <mainClass>org.apache.kylin.job.BuildCubeWithStream</mainClass> - </configuration> - </execution> - <execution> - <id>build_ii_with_stream</id> - <goals> - <goal>java</goal> - </goals> - <phase>pre-integration-test</phase> - <configuration> - <classpathScope>test</classpathScope> - <mainClass>org.apache.kylin.job.BuildIIWithStream</mainClass> - </configuration> - </execution> - </executions> - </plugin> </plugins> </build> </profile> @@ -768,9 +734,20 @@ <configuration> <reportsDirectory>${project.basedir}/../target/surefire-reports</reportsDirectory> <excludes> - <excludes> - <exclude>**/IT*.java</exclude> - </excludes> + <exclude>**/BuildCubeWithEngineTest.java</exclude> + <exclude>**/BuildIIWithEngineTest.java</exclude> + <exclude>**/SampleCubeSetupTest.java</exclude> + <exclude>**/KylinQueryTest.java</exclude> + <exclude>**/SnapshotManagerTest.java</exclude> + <exclude>**/HiveTableReaderTest.java</exclude> + <exclude>**/TableControllerTest.java</exclude> + + <!--test case under server folder are problematic when tested against minicluster--> + <exclude>**/ServiceTestBase.java</exclude> + <exclude>**/JDBCDriverTest.java</exclude> + <exclude>%regex[.*ControllerTest.*]</exclude> + <exclude>%regex[.*ServiceTest.*]</exclude> + </excludes> <systemProperties> <property> @@ -864,7 +841,7 @@ <exclude>webapp/bower.json</exclude> <exclude>webapp/grunt.json</exclude> <exclude>webapp/package.json</exclude> - + <!-- jdbc log --> <exclude>jdbc/kylin_jdbc.log*</exclude> <!-- server log -->