minor, remove build ii step

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/db5f89ba
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/db5f89ba
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/db5f89ba

Branch: refs/heads/master
Commit: db5f89bae53c9fa84c96c5c4693a142a136ef1fe
Parents: edfb37d
Author: Hongbin Ma <mahong...@apache.org>
Authored: Thu Jul 7 17:06:40 2016 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Thu Jul 7 17:06:40 2016 +0800

----------------------------------------------------------------------
 kylin-it/pom.xml                                |  38 +--
 .../kylin/provision/BuildCubeWithStream.java    |  14 +-
 .../kylin/provision/BuildIIWithEngine.java      | 253 ----------------
 .../kylin/provision/BuildIIWithStream.java      | 300 -------------------
 4 files changed, 23 insertions(+), 582 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/db5f89ba/kylin-it/pom.xml
----------------------------------------------------------------------
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index eeb74fe..271bae8 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -18,7 +18,8 @@
 -->
 
 
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
         <artifactId>kylin</artifactId>
         <groupId>org.apache.kylin</groupId>
@@ -30,8 +31,8 @@
     <name>Kylin:IT</name>
 
     <properties>
-        <hdp.version />
-        <fastBuildMode />
+        <hdp.version/>
+        <fastBuildMode/>
     </properties>
 
     <!-- Dependencies. -->
@@ -283,9 +284,13 @@
                         <executions>
                             <execution>
                                 <id>integration-tests</id>
-                                <phase>integration-test</phase>
                                 <goals>
                                     <goal>integration-test</goal>
+                                </goals>
+                            </execution>
+                            <execution>
+                                <id>verify</id>
+                                <goals>
                                     <goal>verify</goal>
                                 </goals>
                             </execution>
@@ -323,7 +328,7 @@
                                         
<argument>-Dhdp.version=${hdp.version}</argument>
                                         
<argument>-DfastBuildMode=${fastBuildMode}</argument>
                                         <argument>-classpath</argument>
-                                        <classpath />
+                                        <classpath/>
                                         
<argument>org.apache.kylin.provision.BuildCubeWithEngine</argument>
                                     </arguments>
                                     
<workingDirectory>${project.basedir}</workingDirectory>
@@ -344,33 +349,12 @@
                                         
<argument>-Dhdp.version=${hdp.version}</argument>
                                         
<argument>-DfastBuildMode=${fastBuildMode}</argument>
                                         <argument>-classpath</argument>
-                                        <classpath />
+                                        <classpath/>
                                         
<argument>org.apache.kylin.provision.BuildCubeWithStream</argument>
                                     </arguments>
                                     
<workingDirectory>${project.basedir}</workingDirectory>
                                 </configuration>
                             </execution>
-                            <execution>
-                                <id>build_ii_with_stream</id>
-                                <goals>
-                                    <goal>exec</goal>
-                                </goals>
-                                <phase>pre-integration-test</phase>
-                                <configuration>
-                                    <skip>${skipTests}</skip>
-                                    <classpathScope>test</classpathScope>
-                                    <executable>java</executable>
-                                    <arguments>
-                                        <argument>-DuseSandbox=true</argument>
-                                        
<argument>-Dhdp.version=${hdp.version}</argument>
-                                        
<argument>-DfastBuildMode=${fastBuildMode}</argument>
-                                        <argument>-classpath</argument>
-                                        <classpath />
-                                        
<argument>org.apache.kylin.provision.BuildIIWithStream</argument>
-                                    </arguments>
-                                    
<workingDirectory>${project.basedir}</workingDirectory>
-                                </configuration>
-                            </execution>
                         </executions>
                     </plugin>
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/db5f89ba/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index c426ea4..1655a17 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.UUID;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
@@ -36,6 +37,7 @@ import org.apache.kylin.job.DeployUtil;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,7 +63,7 @@ public class BuildCubeWithStream {
             buildCubeWithStream.before();
             buildCubeWithStream.build();
             logger.info("Build is done");
-            afterClass();
+            buildCubeWithStream.cleanup();
             logger.info("Going to exit");
             System.exit(0);
         } catch (Exception e) {
@@ -101,10 +103,18 @@ public class BuildCubeWithStream {
         DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, 
cubeName, streamingConfig);
     }
 
-    public static void afterClass() throws Exception {
+    public void cleanup() throws Exception {
+        cleanupOldStorage();
         HBaseMetadataTestCase.staticCleanupTestMetadata();
     }
 
+    private static int cleanupOldStorage() throws Exception {
+        String[] args = { "--delete", "true" };
+
+        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+        return exitCode;
+    }
+
     public void build() throws Exception {
         logger.info("start time:" + startTime + " end time:" + endTime + " 
batch interval:" + batchInterval + " batch count:" + ((endTime - startTime) / 
batchInterval));
         for (long start = startTime; start < endTime; start += batchInterval) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/db5f89ba/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
deleted file mode 100644
index c947d9d..0000000
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.provision;
-
-import java.io.File;
-import java.lang.reflect.Method;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.engine.mr.invertedindex.BatchIIJobBuilder;
-import org.apache.kylin.engine.mr.invertedindex.IIJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-//TODO: convert it to a normal class rather than a test case, like in 
BuildCubeWithEngine 
-@Ignore
-public class BuildIIWithEngine {
-
-    private JobEngineConfig jobEngineConfig;
-    private IIManager iiManager;
-
-    private DefaultScheduler scheduler;
-    protected ExecutableManager jobService;
-
-    protected static final String[] TEST_II_INSTANCES = new String[] { 
"test_kylin_ii_inner_join", "test_kylin_ii_left_join" };
-
-    private static final Log logger = 
LogFactory.getLog(BuildIIWithEngine.class);
-
-    protected void waitForJob(String jobId) {
-        while (true) {
-            AbstractExecutable job = jobService.getJob(jobId);
-            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() 
== ExecutableState.ERROR) {
-                break;
-            } else {
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        logger.info("Adding to classpath: " + new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        ClassUtil.addClasspath(new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, 
HBaseMetadataTestCase.SANDBOX_TEST_DATA);
-        if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
-            throw new RuntimeException("No hdp.version set; Please set 
hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
-        }
-    }
-
-    @Before
-    public void before() throws Exception {
-        
HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
-
-        //DeployUtil.initCliWorkDir();
-        //        DeployUtil.deployMetadata();
-        DeployUtil.overrideJobJarLocations();
-
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        jobService = ExecutableManager.getInstance(kylinConfig);
-        scheduler = DefaultScheduler.createInstance();
-        scheduler.init(new JobEngineConfig(kylinConfig), new 
ZookeeperJobLock());
-        if (!scheduler.hasStarted()) {
-            throw new RuntimeException("scheduler has not been started");
-        }
-        jobEngineConfig = new JobEngineConfig(kylinConfig);
-        for (String jobId : jobService.getAllJobIds()) {
-            if (jobService.getJob(jobId) instanceof IIJob) {
-                jobService.deleteJob(jobId);
-            }
-        }
-
-        iiManager = IIManager.getInstance(kylinConfig);
-        for (String iiInstance : TEST_II_INSTANCES) {
-
-            IIInstance ii = iiManager.getII(iiInstance);
-            if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
-                ii.setStatus(RealizationStatusEnum.DISABLED);
-                iiManager.updateII(ii);
-            }
-        }
-    }
-
-    @After
-    public void after() throws Exception {
-
-        for (String iiInstance : TEST_II_INSTANCES) {
-            IIInstance ii = iiManager.getII(iiInstance);
-            if (ii.getStatus() != RealizationStatusEnum.READY) {
-                ii.setStatus(RealizationStatusEnum.READY);
-                iiManager.updateII(ii);
-            }
-        }
-    }
-
-    @Test
-    public void testBuildII() throws Exception {
-
-        String[] testCase = new String[] { "buildIIInnerJoin", 
"buildIILeftJoin" };
-        ExecutorService executorService = 
Executors.newFixedThreadPool(testCase.length);
-        final CountDownLatch countDownLatch = new 
CountDownLatch(testCase.length);
-        List<Future<List<String>>> tasks = 
Lists.newArrayListWithExpectedSize(testCase.length);
-        for (int i = 0; i < testCase.length; i++) {
-            tasks.add(executorService.submit(new TestCallable(testCase[i], 
countDownLatch)));
-        }
-        countDownLatch.await();
-        for (int i = 0; i < tasks.size(); ++i) {
-            Future<List<String>> task = tasks.get(i);
-            final List<String> jobIds = task.get();
-            for (String jobId : jobIds) {
-                assertJobSucceed(jobId);
-            }
-        }
-
-    }
-
-    private void assertJobSucceed(String jobId) {
-        if (jobService.getOutput(jobId).getState() != ExecutableState.SUCCEED) 
{
-            throw new RuntimeException("The job '" + jobId + "' is failed.");
-        }
-    }
-
-    private class TestCallable implements Callable<List<String>> {
-
-        private final String methodName;
-        private final CountDownLatch countDownLatch;
-
-        public TestCallable(String methodName, CountDownLatch countDownLatch) {
-            this.methodName = methodName;
-            this.countDownLatch = countDownLatch;
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public List<String> call() throws Exception {
-            try {
-                final Method method = 
BuildIIWithEngine.class.getDeclaredMethod(methodName);
-                method.setAccessible(true);
-                return (List<String>) method.invoke(BuildIIWithEngine.this);
-            } finally {
-                countDownLatch.countDown();
-            }
-        }
-    }
-
-    protected List<String> buildIIInnerJoin() throws Exception {
-        return buildII(TEST_II_INSTANCES[0]);
-    }
-
-    protected List<String> buildIILeftJoin() throws Exception {
-        return buildII(TEST_II_INSTANCES[1]);
-    }
-
-    protected List<String> buildII(String iiName) throws Exception {
-        clearSegment(iiName);
-
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-
-        long date1 = 0;
-        long date2 = f.parse("2015-01-01").getTime();
-
-        List<String> result = Lists.newArrayList();
-        result.add(buildSegment(iiName, date1, date2));
-        return result;
-    }
-
-    private void clearSegment(String iiName) throws Exception {
-        IIInstance ii = iiManager.getII(iiName);
-        ii.getSegments().clear();
-        iiManager.updateII(ii);
-    }
-
-    private String buildSegment(String iiName, long startDate, long endDate) 
throws Exception {
-        IIInstance iiInstance = iiManager.getII(iiName);
-        IISegment segment = iiManager.buildSegment(iiInstance, startDate, 
endDate);
-        iiInstance.getSegments().add(segment);
-        iiManager.updateII(iiInstance);
-
-        BatchIIJobBuilder batchIIJobBuilder = new BatchIIJobBuilder(segment, 
"SYSTEM");
-        IIJob job = batchIIJobBuilder.build();
-        jobService.addJob(job);
-        waitForJob(job.getId());
-        return job.getId();
-    }
-
-    private int cleanupOldStorage() throws Exception {
-        String[] args = { "--delete", "true" };
-
-        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
-        return exitCode;
-    }
-
-    public static void main(String[] args) throws Exception {
-        BuildIIWithEngine instance = new BuildIIWithEngine();
-
-        BuildIIWithEngine.beforeClass();
-        instance.before();
-        instance.testBuildII();
-        instance.after();
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/db5f89ba/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
deleted file mode 100644
index 1a335c6..0000000
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.provision;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.UUID;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.SliceBuilder;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.common.ShellExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.source.hive.HiveCmdBuilder;
-import org.apache.kylin.source.hive.HiveTableReader;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.apache.kylin.storage.hbase.ii.IICreateHTableJob;
-import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-public class BuildIIWithStream {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(BuildIIWithStream.class);
-
-    private static final String[] II_NAME = new String[] { 
"test_kylin_ii_left_join", "test_kylin_ii_inner_join" };
-    private IIManager iiManager;
-    private KylinConfig kylinConfig;
-
-    public static void main(String[] args) throws Exception {
-        try {
-            beforeClass();
-            BuildIIWithStream buildCubeWithEngine = new BuildIIWithStream();
-            buildCubeWithEngine.before();
-            buildCubeWithEngine.build();
-            logger.info("Build is done");
-            afterClass();
-            logger.info("Going to exit");
-            System.exit(0);
-        } catch (Exception e) {
-            logger.error("error", e);
-            System.exit(1);
-        }
-    }
-
-    public static void beforeClass() throws Exception {
-        logger.info("Adding to classpath: " + new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        ClassUtil.addClasspath(new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
-            throw new RuntimeException("No hdp.version set; Please set 
hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
-        }
-        
HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
-    }
-
-    protected void deployEnv() throws Exception {
-        DeployUtil.overrideJobJarLocations();
-    }
-
-    public void before() throws Exception {
-        deployEnv();
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        iiManager = IIManager.getInstance(kylinConfig);
-        for (String iiInstance : II_NAME) {
-
-            IIInstance ii = iiManager.getII(iiInstance);
-            if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
-                ii.setStatus(RealizationStatusEnum.DISABLED);
-                iiManager.updateII(ii);
-            }
-        }
-    }
-
-    public static void afterClass() throws Exception {
-        cleanupOldStorage();
-        HBaseMetadataTestCase.staticCleanupTestMetadata();
-    }
-
-    private String createIntermediateTable(IIDesc desc, KylinConfig 
kylinConfig) throws IOException {
-        IIJoinedFlatTableDesc intermediateTableDesc = new 
IIJoinedFlatTableDesc(desc);
-        JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
-        final String uuid = UUID.randomUUID().toString();
-        final String useDatabaseHql = "USE " + 
kylinConfig.getHiveDatabaseForIntermediateTable() + ";";
-        final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
-        final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, 
JobBuilderSupport.getJobWorkingDir(jobEngineConfig, uuid));
-        String insertDataHqls;
-        insertDataHqls = 
JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, 
jobEngineConfig);
-
-        ShellExecutable step = new ShellExecutable();
-        HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-        hiveCmdBuilder.addStatement(useDatabaseHql);
-        hiveCmdBuilder.addStatement(dropTableHql);
-        hiveCmdBuilder.addStatement(createTableHql);
-        hiveCmdBuilder.addStatement(insertDataHqls);
-
-        step.setCmd(hiveCmdBuilder.build());
-        logger.info(step.getCmd());
-        step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
-        kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null);
-        return intermediateTableDesc.getTableName();
-    }
-
-    private void clearSegment(String iiName) throws Exception {
-        IIInstance ii = iiManager.getII(iiName);
-        ii.getSegments().clear();
-        iiManager.updateII(ii);
-    }
-
-    private IISegment createSegment(String iiName) throws Exception {
-        clearSegment(iiName);
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-
-        long date1 = 0;
-        long date2 = f.parse("2015-01-01").getTime();
-        return buildSegment(iiName, date1, date2);
-    }
-
-    private IISegment buildSegment(String iiName, long startDate, long 
endDate) throws Exception {
-        IIInstance iiInstance = iiManager.getII(iiName);
-        IISegment segment = iiManager.buildSegment(iiInstance, startDate, 
endDate);
-        iiInstance.getSegments().add(segment);
-        iiManager.updateII(iiInstance);
-        return segment;
-    }
-
-    private void buildII(String iiName) throws Exception {
-        final IIDesc desc = iiManager.getII(iiName).getDescriptor();
-        final String tableName = createIntermediateTable(desc, kylinConfig);
-        logger.info("intermediate table name:" + tableName);
-
-        HiveTableReader reader = new HiveTableReader("default", tableName);
-        final List<TblColRef> tblColRefs = desc.listAllColumns();
-        for (TblColRef tblColRef : tblColRefs) {
-            if (desc.isMetricsCol(tblColRef)) {
-                logger.info("matrix:" + tblColRef.getName());
-            } else {
-                logger.info("measure:" + tblColRef.getName());
-            }
-        }
-        final IISegment segment = createSegment(iiName);
-        final HTableInterface htable = 
HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(segment.getStorageLocationIdentifier());
-        String[] args = new String[] { "-iiname", iiName, "-htablename", 
segment.getStorageLocationIdentifier() };
-        ToolRunner.run(new IICreateHTableJob(), args);
-
-        final IIDesc iiDesc = segment.getIIDesc();
-        final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0);
-
-        List<String[]> sorted = getSortedRows(reader, 
desc.getTimestampColumn());
-        int count = sorted.size();
-        ArrayList<StreamingMessage> messages = Lists.newArrayList();
-        for (String[] row : sorted) {
-            messages.add((parse(row)));
-            if (messages.size() >= iiDesc.getSliceSize()) {
-                build(sliceBuilder, new StreamingBatch(messages, 
Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
-                messages.clear();
-            }
-        }
-
-        if (!messages.isEmpty()) {
-            build(sliceBuilder, new StreamingBatch(messages, 
Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
-        }
-
-        reader.close();
-        logger.info("total record count:" + count + " htable:" + 
segment.getStorageLocationIdentifier());
-        logger.info("stream build finished, htable name:" + 
segment.getStorageLocationIdentifier());
-    }
-
-    public void build() throws Exception {
-        for (String iiName : II_NAME) {
-            buildII(iiName);
-            IIInstance ii = iiManager.getII(iiName);
-            if (ii.getStatus() != RealizationStatusEnum.READY) {
-                ii.setStatus(RealizationStatusEnum.READY);
-                iiManager.updateII(ii);
-            }
-        }
-    }
-
-    private void build(SliceBuilder sliceBuilder, StreamingBatch batch, 
HTableInterface htable) throws IOException {
-        final Slice slice = sliceBuilder.buildSlice(batch);
-        try {
-            loadToHBase(htable, slice, new IIKeyValueCodec(slice.getInfo()));
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void loadToHBase(HTableInterface hTable, Slice slice, 
IIKeyValueCodec codec) throws IOException {
-        List<Put> data = Lists.newArrayList();
-        for (IIRow row : codec.encodeKeyValue(slice)) {
-            final byte[] key = row.getKey().get();
-            final byte[] value = row.getValue().get();
-            Put put = new Put(key);
-            put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, 
value);
-            final ImmutableBytesWritable dictionary = row.getDictionary();
-            final byte[] dictBytes = dictionary.get();
-            if (dictionary.getOffset() == 0 && dictionary.getLength() == 
dictBytes.length) {
-                put.add(IIDesc.HBASE_FAMILY_BYTES, 
IIDesc.HBASE_DICTIONARY_BYTES, dictBytes);
-            } else {
-                throw new RuntimeException("dict offset should be 0, and dict 
length should be " + dictBytes.length + " but they are" + 
dictionary.getOffset() + " " + dictionary.getLength());
-            }
-            data.add(put);
-        }
-        hTable.put(data);
-        //omit hTable.flushCommits(), because htable is auto flush
-    }
-
-    private StreamingMessage parse(String[] row) {
-        return new StreamingMessage(Lists.newArrayList(row), 
System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, 
Object> emptyMap());
-    }
-
-    private List<String[]> getSortedRows(HiveTableReader reader, final int 
tsCol) throws IOException {
-        List<String[]> unsorted = Lists.newArrayList();
-        while (reader.next()) {
-            unsorted.add(reader.getRow());
-        }
-        Collections.sort(unsorted, new Comparator<String[]>() {
-            @Override
-            public int compare(String[] o1, String[] o2) {
-                long t1 = DateFormat.stringToMillis(o1[tsCol]);
-                long t2 = DateFormat.stringToMillis(o2[tsCol]);
-                return Long.compare(t1, t2);
-            }
-        });
-        return unsorted;
-    }
-
-    private static int cleanupOldStorage() throws Exception {
-        String[] args = { "--delete", "true" };
-
-        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
-        return exitCode;
-    }
-
-}

Reply via email to