minor, remove build ii step
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/db5f89ba Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/db5f89ba Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/db5f89ba Branch: refs/heads/master Commit: db5f89bae53c9fa84c96c5c4693a142a136ef1fe Parents: edfb37d Author: Hongbin Ma <mahong...@apache.org> Authored: Thu Jul 7 17:06:40 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Thu Jul 7 17:06:40 2016 +0800 ---------------------------------------------------------------------- kylin-it/pom.xml | 38 +-- .../kylin/provision/BuildCubeWithStream.java | 14 +- .../kylin/provision/BuildIIWithEngine.java | 253 ---------------- .../kylin/provision/BuildIIWithStream.java | 300 ------------------- 4 files changed, 23 insertions(+), 582 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/db5f89ba/kylin-it/pom.xml ---------------------------------------------------------------------- diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml index eeb74fe..271bae8 100644 --- a/kylin-it/pom.xml +++ b/kylin-it/pom.xml @@ -18,7 +18,8 @@ --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>kylin</artifactId> <groupId>org.apache.kylin</groupId> @@ -30,8 +31,8 @@ <name>Kylin:IT</name> <properties> - <hdp.version /> - <fastBuildMode /> + <hdp.version/> + <fastBuildMode/> </properties> <!-- Dependencies. --> @@ -283,9 +284,13 @@ <executions> <execution> <id>integration-tests</id> - <phase>integration-test</phase> <goals> <goal>integration-test</goal> + </goals> + </execution> + <execution> + <id>verify</id> + <goals> <goal>verify</goal> </goals> </execution> @@ -323,7 +328,7 @@ <argument>-Dhdp.version=${hdp.version}</argument> <argument>-DfastBuildMode=${fastBuildMode}</argument> <argument>-classpath</argument> - <classpath /> + <classpath/> <argument>org.apache.kylin.provision.BuildCubeWithEngine</argument> </arguments> <workingDirectory>${project.basedir}</workingDirectory> @@ -344,33 +349,12 @@ <argument>-Dhdp.version=${hdp.version}</argument> <argument>-DfastBuildMode=${fastBuildMode}</argument> <argument>-classpath</argument> - <classpath /> + <classpath/> <argument>org.apache.kylin.provision.BuildCubeWithStream</argument> </arguments> <workingDirectory>${project.basedir}</workingDirectory> </configuration> </execution> - <execution> - <id>build_ii_with_stream</id> - <goals> - <goal>exec</goal> - </goals> - <phase>pre-integration-test</phase> - <configuration> - <skip>${skipTests}</skip> - <classpathScope>test</classpathScope> - <executable>java</executable> - <arguments> - <argument>-DuseSandbox=true</argument> - <argument>-Dhdp.version=${hdp.version}</argument> - <argument>-DfastBuildMode=${fastBuildMode}</argument> - <argument>-classpath</argument> - <classpath /> - <argument>org.apache.kylin.provision.BuildIIWithStream</argument> - </arguments> - <workingDirectory>${project.basedir}</workingDirectory> - </configuration> - </execution> </executions> </plugin> http://git-wip-us.apache.org/repos/asf/kylin/blob/db5f89ba/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index c426ea4..1655a17 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.UUID; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.DateFormat; @@ -36,6 +37,7 @@ import org.apache.kylin.job.DeployUtil; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.storage.hbase.util.StorageCleanupJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +63,7 @@ public class BuildCubeWithStream { buildCubeWithStream.before(); buildCubeWithStream.build(); logger.info("Build is done"); - afterClass(); + buildCubeWithStream.cleanup(); logger.info("Going to exit"); System.exit(0); } catch (Exception e) { @@ -101,10 +103,18 @@ public class BuildCubeWithStream { DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, streamingConfig); } - public static void afterClass() throws Exception { + public void cleanup() throws Exception { + cleanupOldStorage(); HBaseMetadataTestCase.staticCleanupTestMetadata(); } + private static int cleanupOldStorage() throws Exception { + String[] args = { "--delete", "true" }; + + int exitCode = ToolRunner.run(new StorageCleanupJob(), args); + return exitCode; + } + public void build() throws Exception { logger.info("start time:" + startTime + " end time:" + endTime + " batch interval:" + batchInterval + " batch count:" + ((endTime - startTime) / batchInterval)); for (long start = startTime; start < endTime; start += batchInterval) { http://git-wip-us.apache.org/repos/asf/kylin/blob/db5f89ba/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java deleted file mode 100644 index c947d9d..0000000 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.provision; - -import java.io.File; -import java.lang.reflect.Method; -import java.text.SimpleDateFormat; -import java.util.List; -import java.util.TimeZone; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.HBaseMetadataTestCase; -import org.apache.kylin.engine.mr.invertedindex.BatchIIJobBuilder; -import org.apache.kylin.engine.mr.invertedindex.IIJob; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.job.DeployUtil; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.impl.threadpool.DefaultScheduler; -import org.apache.kylin.job.manager.ExecutableManager; -import org.apache.kylin.metadata.realization.RealizationStatusEnum; -import org.apache.kylin.storage.hbase.util.StorageCleanupJob; -import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import com.google.common.collect.Lists; - -//TODO: convert it to a normal class rather than a test case, like in BuildCubeWithEngine -@Ignore -public class BuildIIWithEngine { - - private JobEngineConfig jobEngineConfig; - private IIManager iiManager; - - private DefaultScheduler scheduler; - protected ExecutableManager jobService; - - protected static final String[] TEST_II_INSTANCES = new String[] { "test_kylin_ii_inner_join", "test_kylin_ii_left_join" }; - - private static final Log logger = LogFactory.getLog(BuildIIWithEngine.class); - - protected void waitForJob(String jobId) { - while (true) { - AbstractExecutable job = jobService.getJob(jobId); - if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) { - break; - } else { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - } - - @BeforeClass - public static void beforeClass() throws Exception { - logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA); - if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { - throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); - } - } - - @Before - public void before() throws Exception { - HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); - - //DeployUtil.initCliWorkDir(); - // DeployUtil.deployMetadata(); - DeployUtil.overrideJobJarLocations(); - - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - jobService = ExecutableManager.getInstance(kylinConfig); - scheduler = DefaultScheduler.createInstance(); - scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); - if (!scheduler.hasStarted()) { - throw new RuntimeException("scheduler has not been started"); - } - jobEngineConfig = new JobEngineConfig(kylinConfig); - for (String jobId : jobService.getAllJobIds()) { - if (jobService.getJob(jobId) instanceof IIJob) { - jobService.deleteJob(jobId); - } - } - - iiManager = IIManager.getInstance(kylinConfig); - for (String iiInstance : TEST_II_INSTANCES) { - - IIInstance ii = iiManager.getII(iiInstance); - if (ii.getStatus() != RealizationStatusEnum.DISABLED) { - ii.setStatus(RealizationStatusEnum.DISABLED); - iiManager.updateII(ii); - } - } - } - - @After - public void after() throws Exception { - - for (String iiInstance : TEST_II_INSTANCES) { - IIInstance ii = iiManager.getII(iiInstance); - if (ii.getStatus() != RealizationStatusEnum.READY) { - ii.setStatus(RealizationStatusEnum.READY); - iiManager.updateII(ii); - } - } - } - - @Test - public void testBuildII() throws Exception { - - String[] testCase = new String[] { "buildIIInnerJoin", "buildIILeftJoin" }; - ExecutorService executorService = Executors.newFixedThreadPool(testCase.length); - final CountDownLatch countDownLatch = new CountDownLatch(testCase.length); - List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length); - for (int i = 0; i < testCase.length; i++) { - tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch))); - } - countDownLatch.await(); - for (int i = 0; i < tasks.size(); ++i) { - Future<List<String>> task = tasks.get(i); - final List<String> jobIds = task.get(); - for (String jobId : jobIds) { - assertJobSucceed(jobId); - } - } - - } - - private void assertJobSucceed(String jobId) { - if (jobService.getOutput(jobId).getState() != ExecutableState.SUCCEED) { - throw new RuntimeException("The job '" + jobId + "' is failed."); - } - } - - private class TestCallable implements Callable<List<String>> { - - private final String methodName; - private final CountDownLatch countDownLatch; - - public TestCallable(String methodName, CountDownLatch countDownLatch) { - this.methodName = methodName; - this.countDownLatch = countDownLatch; - } - - @SuppressWarnings("unchecked") - @Override - public List<String> call() throws Exception { - try { - final Method method = BuildIIWithEngine.class.getDeclaredMethod(methodName); - method.setAccessible(true); - return (List<String>) method.invoke(BuildIIWithEngine.this); - } finally { - countDownLatch.countDown(); - } - } - } - - protected List<String> buildIIInnerJoin() throws Exception { - return buildII(TEST_II_INSTANCES[0]); - } - - protected List<String> buildIILeftJoin() throws Exception { - return buildII(TEST_II_INSTANCES[1]); - } - - protected List<String> buildII(String iiName) throws Exception { - clearSegment(iiName); - - SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); - f.setTimeZone(TimeZone.getTimeZone("GMT")); - - long date1 = 0; - long date2 = f.parse("2015-01-01").getTime(); - - List<String> result = Lists.newArrayList(); - result.add(buildSegment(iiName, date1, date2)); - return result; - } - - private void clearSegment(String iiName) throws Exception { - IIInstance ii = iiManager.getII(iiName); - ii.getSegments().clear(); - iiManager.updateII(ii); - } - - private String buildSegment(String iiName, long startDate, long endDate) throws Exception { - IIInstance iiInstance = iiManager.getII(iiName); - IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate); - iiInstance.getSegments().add(segment); - iiManager.updateII(iiInstance); - - BatchIIJobBuilder batchIIJobBuilder = new BatchIIJobBuilder(segment, "SYSTEM"); - IIJob job = batchIIJobBuilder.build(); - jobService.addJob(job); - waitForJob(job.getId()); - return job.getId(); - } - - private int cleanupOldStorage() throws Exception { - String[] args = { "--delete", "true" }; - - int exitCode = ToolRunner.run(new StorageCleanupJob(), args); - return exitCode; - } - - public static void main(String[] args) throws Exception { - BuildIIWithEngine instance = new BuildIIWithEngine(); - - BuildIIWithEngine.beforeClass(); - instance.before(); - instance.testBuildII(); - instance.after(); - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/db5f89ba/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java deleted file mode 100644 index 1a335c6..0000000 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java +++ /dev/null @@ -1,300 +0,0 @@ -/* - * - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * - * contributor license agreements. See the NOTICE file distributed with - * - * this work for additional information regarding copyright ownership. - * - * The ASF licenses this file to You under the Apache License, Version 2.0 - * - * (the "License"); you may not use this file except in compliance with - * - * the License. You may obtain a copy of the License at - * - * - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * - * - * Unless required by applicable law or agreed to in writing, software - * - * distributed under the License is distributed on an "AS IS" BASIS, - * - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * - * See the License for the specific language governing permissions and - * - * limitations under the License. - * - * / - */ - -package org.apache.kylin.provision; - -import java.io.File; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.TimeZone; -import java.util.UUID; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.common.util.HBaseMetadataTestCase; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.common.util.StreamingBatch; -import org.apache.kylin.common.util.StreamingMessage; -import org.apache.kylin.engine.mr.JobBuilderSupport; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.invertedindex.index.Slice; -import org.apache.kylin.invertedindex.index.SliceBuilder; -import org.apache.kylin.invertedindex.model.IIDesc; -import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc; -import org.apache.kylin.invertedindex.model.IIKeyValueCodec; -import org.apache.kylin.invertedindex.model.IIRow; -import org.apache.kylin.job.DeployUtil; -import org.apache.kylin.job.JoinedFlatTable; -import org.apache.kylin.job.common.ShellExecutable; -import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.realization.RealizationStatusEnum; -import org.apache.kylin.source.hive.HiveCmdBuilder; -import org.apache.kylin.source.hive.HiveTableReader; -import org.apache.kylin.storage.hbase.HBaseConnection; -import org.apache.kylin.storage.hbase.ii.IICreateHTableJob; -import org.apache.kylin.storage.hbase.util.StorageCleanupJob; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -public class BuildIIWithStream { - - private static final Logger logger = LoggerFactory.getLogger(BuildIIWithStream.class); - - private static final String[] II_NAME = new String[] { "test_kylin_ii_left_join", "test_kylin_ii_inner_join" }; - private IIManager iiManager; - private KylinConfig kylinConfig; - - public static void main(String[] args) throws Exception { - try { - beforeClass(); - BuildIIWithStream buildCubeWithEngine = new BuildIIWithStream(); - buildCubeWithEngine.before(); - buildCubeWithEngine.build(); - logger.info("Build is done"); - afterClass(); - logger.info("Going to exit"); - System.exit(0); - } catch (Exception e) { - logger.error("error", e); - System.exit(1); - } - } - - public static void beforeClass() throws Exception { - logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { - throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); - } - HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); - } - - protected void deployEnv() throws Exception { - DeployUtil.overrideJobJarLocations(); - } - - public void before() throws Exception { - deployEnv(); - - kylinConfig = KylinConfig.getInstanceFromEnv(); - iiManager = IIManager.getInstance(kylinConfig); - for (String iiInstance : II_NAME) { - - IIInstance ii = iiManager.getII(iiInstance); - if (ii.getStatus() != RealizationStatusEnum.DISABLED) { - ii.setStatus(RealizationStatusEnum.DISABLED); - iiManager.updateII(ii); - } - } - } - - public static void afterClass() throws Exception { - cleanupOldStorage(); - HBaseMetadataTestCase.staticCleanupTestMetadata(); - } - - private String createIntermediateTable(IIDesc desc, KylinConfig kylinConfig) throws IOException { - IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(desc); - JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig); - final String uuid = UUID.randomUUID().toString(); - final String useDatabaseHql = "USE " + kylinConfig.getHiveDatabaseForIntermediateTable() + ";"; - final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc); - final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, JobBuilderSupport.getJobWorkingDir(jobEngineConfig, uuid)); - String insertDataHqls; - insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobEngineConfig); - - ShellExecutable step = new ShellExecutable(); - HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.addStatement(useDatabaseHql); - hiveCmdBuilder.addStatement(dropTableHql); - hiveCmdBuilder.addStatement(createTableHql); - hiveCmdBuilder.addStatement(insertDataHqls); - - step.setCmd(hiveCmdBuilder.build()); - logger.info(step.getCmd()); - step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); - kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null); - return intermediateTableDesc.getTableName(); - } - - private void clearSegment(String iiName) throws Exception { - IIInstance ii = iiManager.getII(iiName); - ii.getSegments().clear(); - iiManager.updateII(ii); - } - - private IISegment createSegment(String iiName) throws Exception { - clearSegment(iiName); - SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); - f.setTimeZone(TimeZone.getTimeZone("GMT")); - - long date1 = 0; - long date2 = f.parse("2015-01-01").getTime(); - return buildSegment(iiName, date1, date2); - } - - private IISegment buildSegment(String iiName, long startDate, long endDate) throws Exception { - IIInstance iiInstance = iiManager.getII(iiName); - IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate); - iiInstance.getSegments().add(segment); - iiManager.updateII(iiInstance); - return segment; - } - - private void buildII(String iiName) throws Exception { - final IIDesc desc = iiManager.getII(iiName).getDescriptor(); - final String tableName = createIntermediateTable(desc, kylinConfig); - logger.info("intermediate table name:" + tableName); - - HiveTableReader reader = new HiveTableReader("default", tableName); - final List<TblColRef> tblColRefs = desc.listAllColumns(); - for (TblColRef tblColRef : tblColRefs) { - if (desc.isMetricsCol(tblColRef)) { - logger.info("matrix:" + tblColRef.getName()); - } else { - logger.info("measure:" + tblColRef.getName()); - } - } - final IISegment segment = createSegment(iiName); - final HTableInterface htable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(segment.getStorageLocationIdentifier()); - String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() }; - ToolRunner.run(new IICreateHTableJob(), args); - - final IIDesc iiDesc = segment.getIIDesc(); - final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0); - - List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn()); - int count = sorted.size(); - ArrayList<StreamingMessage> messages = Lists.newArrayList(); - for (String[] row : sorted) { - messages.add((parse(row))); - if (messages.size() >= iiDesc.getSliceSize()) { - build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable); - messages.clear(); - } - } - - if (!messages.isEmpty()) { - build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable); - } - - reader.close(); - logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier()); - logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier()); - } - - public void build() throws Exception { - for (String iiName : II_NAME) { - buildII(iiName); - IIInstance ii = iiManager.getII(iiName); - if (ii.getStatus() != RealizationStatusEnum.READY) { - ii.setStatus(RealizationStatusEnum.READY); - iiManager.updateII(ii); - } - } - } - - private void build(SliceBuilder sliceBuilder, StreamingBatch batch, HTableInterface htable) throws IOException { - final Slice slice = sliceBuilder.buildSlice(batch); - try { - loadToHBase(htable, slice, new IIKeyValueCodec(slice.getInfo())); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException { - List<Put> data = Lists.newArrayList(); - for (IIRow row : codec.encodeKeyValue(slice)) { - final byte[] key = row.getKey().get(); - final byte[] value = row.getValue().get(); - Put put = new Put(key); - put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value); - final ImmutableBytesWritable dictionary = row.getDictionary(); - final byte[] dictBytes = dictionary.get(); - if (dictionary.getOffset() == 0 && dictionary.getLength() == dictBytes.length) { - put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes); - } else { - throw new RuntimeException("dict offset should be 0, and dict length should be " + dictBytes.length + " but they are" + dictionary.getOffset() + " " + dictionary.getLength()); - } - data.add(put); - } - hTable.put(data); - //omit hTable.flushCommits(), because htable is auto flush - } - - private StreamingMessage parse(String[] row) { - return new StreamingMessage(Lists.newArrayList(row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, Object> emptyMap()); - } - - private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException { - List<String[]> unsorted = Lists.newArrayList(); - while (reader.next()) { - unsorted.add(reader.getRow()); - } - Collections.sort(unsorted, new Comparator<String[]>() { - @Override - public int compare(String[] o1, String[] o2) { - long t1 = DateFormat.stringToMillis(o1[tsCol]); - long t2 = DateFormat.stringToMillis(o2[tsCol]); - return Long.compare(t1, t2); - } - }); - return unsorted; - } - - private static int cleanupOldStorage() throws Exception { - String[] args = { "--delete", "true" }; - - int exitCode = ToolRunner.run(new StorageCleanupJob(), args); - return exitCode; - } - -}