http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
----------------------------------------------------------------------
diff --git 
a/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
 
b/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
deleted file mode 100644
index 57c0be3..0000000
--- 
a/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.hive;
-
-import java.io.IOException;
-
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * This test case need the hive runtime; Please run it with sandbox;
- * @author shaoshi
- *
- * It is in the exclude list of default profile in pom.xml
- */
-public class ITHiveTableReaderTest extends HBaseMetadataTestCase {
-
-    @Test
-    public void test() throws IOException {
-        HiveTableReader reader = new HiveTableReader("default", 
"test_kylin_fact");
-        int rowNumber = 0;
-        while (reader.next()) {
-            String[] row = reader.getRow();
-            Assert.assertEquals(9, row.length);
-            //System.out.println(ArrayUtils.toString(row));
-            rowNumber++;
-        }
-
-        reader.close();
-        Assert.assertEquals(10000, rowNumber);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/assembly/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/assembly/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
 
b/assembly/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
deleted file mode 100644
index 0df632a..0000000
--- 
a/assembly/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.hive;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.kylin.dict.lookup.SnapshotManager;
-import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.ReadableTable.TableReader;
-import org.apache.kylin.source.SourceFactory;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * @author yangli9
- * 
- */
-public class ITSnapshotManagerTest extends HBaseMetadataTestCase {
-
-    SnapshotManager snapshotMgr;
-
-    @Before
-    public void setup() throws Exception {
-        createTestMetadata();
-        snapshotMgr = SnapshotManager.getInstance(getTestConfig());
-    }
-
-    @After
-    public void after() throws Exception {
-        cleanupTestMetadata();
-    }
-
-    @Test
-    public void basicTest() throws Exception {
-        String tableName = "EDW.TEST_SITES";
-        TableDesc tableDesc = 
MetadataManager.getInstance(getTestConfig()).getTableDesc(tableName);
-        ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
-        String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, 
tableDesc).getResourcePath();
-
-        snapshotMgr.wipeoutCache();
-
-        SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotPath);
-
-        // compare hive & snapshot
-        TableReader hiveReader = hiveTable.getReader();
-        TableReader snapshotReader = snapshot.getReader();
-
-        while (true) {
-            boolean hiveNext = hiveReader.next();
-            boolean snapshotNext = snapshotReader.next();
-            assertEquals(hiveNext, snapshotNext);
-
-            if (hiveNext == false)
-                break;
-
-            String[] hiveRow = hiveReader.getRow();
-            String[] snapshotRow = snapshotReader.getRow();
-            assertArrayEquals(hiveRow, snapshotRow);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java 
b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index ae96e67..b481557 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -79,6 +79,8 @@ public class BasicTest {
 
     @Test
     public void testxx() throws InterruptedException {
+        System.out.println(System.getProperty("skipTests").length());
+        System.out.println(System.getProperty("skipXests"));
         byte[][] data = new byte[10000000][];
         byte[] temp = new byte[100];
         for (int i = 0; i < 100; i++) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java
 
b/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java
new file mode 100644
index 0000000..9d78957
--- /dev/null
+++ 
b/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.io.File;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+
+/**
+ * @author ysong1
+ */
+public class HBaseMetadataTestCase extends AbstractKylinTestCase {
+
+    static {
+        try {
+            ClassUtil.addClasspath(new 
File("../examples/test_case_data/sandbox/").getAbsolutePath());
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void createTestMetadata() throws Exception {
+        staticCreateTestMetadata();
+    }
+
+    @Override
+    public void cleanupTestMetadata() {
+        staticCleanupTestMetadata();
+    }
+
+    public static void staticCreateTestMetadata() throws Exception {
+        staticCreateTestMetadata(SANDBOX_TEST_DATA);
+    }
+
+    public static void staticCreateTestMetadata(String kylinConfigFolder) {
+
+        KylinConfig.destoryInstance();
+
+        if (System.getProperty(KylinConfig.KYLIN_CONF) == null && 
System.getenv(KylinConfig.KYLIN_CONF) == null)
+            System.setProperty(KylinConfig.KYLIN_CONF, kylinConfigFolder);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/kylin-it/pom.xml
----------------------------------------------------------------------
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
new file mode 100644
index 0000000..cc3e9b2
--- /dev/null
+++ b/kylin-it/pom.xml
@@ -0,0 +1,367 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>kylin</artifactId>
+        <groupId>org.apache.kylin</groupId>
+        <version>2.0-incubating-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-it</artifactId>
+    <name>Kylin:IT</name>
+
+
+    <!-- Dependencies. -->
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>atopcalcite</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-storage</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-source-hive</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-jdbc</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-storage-hbase</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-query</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-invertedindex</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.calcite</groupId>
+            <artifactId>calcite-linq4j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+        </dependency>
+
+        <!-- Env & Test -->
+
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-storage</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-storage-hbase</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-storage-hbase</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-assembly</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>xerces</groupId>
+            <artifactId>xercesImpl</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>xalan</groupId>
+            <artifactId>xalan</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.dbunit</groupId>
+            <artifactId>dbunit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <scope>provided</scope>
+            <!-- protobuf version conflict with hbase -->
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <scope>provided</scope>
+            <!-- version conflict with hadoop2.2 -->
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-jdbc</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-testing-util</artifactId>
+            <version>${hbase-hadoop2.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet.jsp</groupId>
+                    <artifactId>jsp-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+
+    <profiles>
+        <profile>
+            <id>sandbox</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-failsafe-plugin</artifactId>
+                        <version>2.16</version>
+                        <executions>
+                            <execution>
+                                <id>integration-tests</id>
+                                <phase>integration-test</phase>
+                                <goals>
+                                    <goal>integration-test</goal>
+                                    <goal>verify</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <excludes>
+                                <exclude>**/*$*</exclude>
+                            </excludes>
+                            <systemProperties>
+                                <property>
+                                    <name>useSandbox</name>
+                                    <value>true</value>
+                                </property>
+                            </systemProperties>
+                        </configuration>
+                    </plugin>
+
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>exec-maven-plugin</artifactId>
+                        <version>1.4.0</version>
+                        <executions>
+                            <execution>
+                                <id>build_cube_with_engine</id>
+                                <goals>
+                                    <goal>java</goal>
+                                </goals>
+                                <phase>pre-integration-test</phase>
+                                <configuration>
+                                    <skip>${skipTests}</skip>
+                                    <classpathScope>test</classpathScope>
+                                    
<mainClass>org.apache.kylin.provision.BuildCubeWithEngine</mainClass>
+                                </configuration>
+                            </execution>
+                            <execution>
+                                <id>build_cube_with_stream</id>
+                                <goals>
+                                    <goal>java</goal>
+                                </goals>
+                                <phase>pre-integration-test</phase>
+                                <configuration>
+                                    <skip>${skipTests}</skip>
+                                    <classpathScope>test</classpathScope>
+                                    
<mainClass>org.apache.kylin.provision.BuildCubeWithStream</mainClass>
+                                </configuration>
+                            </execution>
+                            <execution>
+                                <id>build_ii_with_stream</id>
+                                <goals>
+                                    <goal>java</goal>
+                                </goals>
+                                <phase>pre-integration-test</phase>
+                                <configuration>
+                                    <skip>${skipTests}</skip>
+                                    <classpathScope>test</classpathScope>
+                                    
<mainClass>org.apache.kylin.provision.BuildIIWithStream</mainClass>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <!-- This profile adds/overrides few features of the 
'apache-release'
+                 profile in the parent pom. -->
+            <id>apache-release</id>
+            <build>
+                <plugins>
+                    <!-- Apache-RAT checks for files without headers.
+                         If run on a messy developer's sandbox, it will fail.
+                         This serves as a reminder to only build a release in 
a clean
+                         sandbox! -->
+                    <plugin>
+                        <groupId>org.apache.rat</groupId>
+                        <artifactId>apache-rat-plugin</artifactId>
+                        <configuration>
+                            <numUnapprovedLicenses>0</numUnapprovedLicenses>
+                            <excludes>
+                                <!-- test data -->
+                                <exclude>src/test/**/*.dat</exclude>
+                                <exclude>src/test/**/*.expected</exclude>
+                            </excludes>
+                        </configuration>
+                        <executions>
+                            <execution>
+                                <phase>verify</phase>
+                                <goals>
+                                    <goal>check</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <dependencies>
+                            <dependency>
+                                <groupId>org.apache.maven.doxia</groupId>
+                                <artifactId>doxia-core</artifactId>
+                                <version>1.6</version>
+                                <exclusions>
+                                    <exclusion>
+                                        <groupId>xerces</groupId>
+                                        <artifactId>xercesImpl</artifactId>
+                                    </exclusion>
+                                </exclusions>
+                            </dependency>
+                        </dependencies>
+                    </plugin>
+                    <plugin>
+                        <groupId>net.ju-n.maven.plugins</groupId>
+                        <artifactId>checksum-maven-plugin</artifactId>
+                        <version>1.2</version>
+                        <executions>
+                            <execution>
+                                <goals>
+                                    <goal>artifacts</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <algorithms>
+                                <algorithm>MD5</algorithm>
+                                <algorithm>SHA-1</algorithm>
+                            </algorithms>
+                            <failOnError>false</failOnError>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java 
b/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
new file mode 100644
index 0000000..9b67248
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.jdbc;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.webapp.WebAppContext;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class ITJDBCDriverTest extends HBaseMetadataTestCase {
+
+    private static Server server = null;
+    private static SystemPropertiesOverride sysPropsOverride = new 
SystemPropertiesOverride();
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        sysPropsOverride.override("spring.profiles.active", "testing");
+        sysPropsOverride.override("catalina.home", "."); // 
resources/log4j.properties ref ${catalina.home}
+        staticCreateTestMetadata();
+        startJetty();
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        stopJetty();
+        staticCleanupTestMetadata();
+        sysPropsOverride.restore();
+    }
+
+    protected static void stopJetty() throws Exception {
+        if (server != null)
+            server.stop();
+
+        File workFolder = new File("work");
+        if (workFolder.isDirectory() && workFolder.exists()) {
+            FileUtils.deleteDirectory(workFolder);
+        }
+    }
+
+    protected static void startJetty() throws Exception {
+
+        server = new Server(7070);
+
+        WebAppContext context = new WebAppContext();
+        context.setDescriptor("./src/main/webapp/WEB-INF/web.xml");
+        context.setResourceBase("./src/main/webapp");
+        context.setContextPath("/kylin");
+        context.setParentLoaderPriority(true);
+
+        server.setHandler(context);
+
+        server.start();
+
+    }
+
+    protected Connection getConnection() throws Exception {
+
+        Driver driver = (Driver) 
Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
+        Properties info = new Properties();
+        info.put("user", "ADMIN");
+        info.put("password", "KYLIN");
+        Connection conn = 
driver.connect("jdbc:kylin://localhost:7070/default", info);
+
+        return conn;
+    }
+
+    @Test
+    public void testMetadata1() throws Exception {
+
+        // check the JDBC API here: 
http://docs.oracle.com/javase/7/docs/api/java/sql/DatabaseMetaData.html
+        Connection conn = getConnection();
+
+        // test getSchemas();
+        List<String> schemaList = Lists.newArrayList();
+        DatabaseMetaData dbMetadata = conn.getMetaData();
+        ResultSet resultSet = dbMetadata.getSchemas();
+        while (resultSet.next()) {
+            String schema = resultSet.getString("TABLE_SCHEM");
+            String catalog = resultSet.getString("TABLE_CATALOG");
+
+            System.out.println("Get schema: schema=" + schema + ", catalog=" + 
catalog);
+            schemaList.add(schema);
+
+        }
+
+        resultSet.close();
+        Assert.assertTrue(schemaList.contains("DEFAULT"));
+        Assert.assertTrue(schemaList.contains("EDW"));
+
+        // test getCatalogs();
+        resultSet = dbMetadata.getCatalogs();
+
+        List<String> catalogList = Lists.newArrayList();
+        while (resultSet.next()) {
+            String catalog = resultSet.getString("TABLE_CAT");
+
+            System.out.println("Get catalog: catalog=" + catalog);
+            catalogList.add(catalog);
+
+        }
+        Assert.assertTrue(catalogList.size() > 0 && 
catalogList.contains("defaultCatalog"));
+
+        /** //Disable the test on getTableTypes() as it is not ready
+         resultSet = dbMetadata.getTableTypes();
+
+         List<String> tableTypes = Lists.newArrayList();
+         while (resultSet.next()) {
+         String type = resultSet.getString("TABLE_TYPE");
+
+         System.out.println("Get table type: type=" + type);
+         tableTypes.add(type);
+
+         }
+
+         Assert.assertTrue(tableTypes.size() > 0 && 
tableTypes.contains("TABLE"));
+         resultSet.close();
+
+         **/
+        conn.close();
+    }
+
+    @Test
+    public void testMetadata2() throws Exception {
+        Connection conn = getConnection();
+
+        List<String> tableList = Lists.newArrayList();
+        DatabaseMetaData dbMetadata = conn.getMetaData();
+        ResultSet resultSet = dbMetadata.getTables(null, "%", "%", new 
String[] { "TABLE" });
+        while (resultSet.next()) {
+            String schema = resultSet.getString("TABLE_SCHEM");
+            String name = resultSet.getString("TABLE_NAME");
+
+            System.out.println("Get table: schema=" + schema + ", name=" + 
name);
+            tableList.add(schema + "." + name);
+
+        }
+
+        resultSet.close();
+        Assert.assertTrue(tableList.contains("DEFAULT.TEST_KYLIN_FACT"));
+
+        resultSet = dbMetadata.getColumns(null, "%", "TEST_KYLIN_FACT", "%");
+
+        List<String> columns = Lists.newArrayList();
+        while (resultSet.next()) {
+            String name = resultSet.getString("COLUMN_NAME");
+            String type = resultSet.getString("TYPE_NAME");
+
+            System.out.println("Get column: name=" + name + ", data_type=" + 
type);
+            columns.add(name);
+
+        }
+
+        Assert.assertTrue(columns.size() > 0 && columns.contains("CAL_DT"));
+        resultSet.close();
+        conn.close();
+    }
+
+    @Test
+    public void testSimpleStatement() throws Exception {
+        Connection conn = getConnection();
+        Statement statement = conn.createStatement();
+
+        statement.execute("select count(*) from test_kylin_fact");
+
+        ResultSet rs = statement.getResultSet();
+
+        Assert.assertTrue(rs.next());
+        int result = rs.getInt(1);
+
+        Assert.assertTrue(result > 0);
+
+        rs.close();
+        statement.close();
+        conn.close();
+
+    }
+
+    @Test
+    public void testPreparedStatement() throws Exception {
+        Connection conn = getConnection();
+
+        PreparedStatement statement = conn.prepareStatement("select 
LSTG_FORMAT_NAME, sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact 
" + "where LSTG_FORMAT_NAME = ? group by LSTG_FORMAT_NAME");
+
+        statement.setString(1, "FP-GTC");
+
+        ResultSet rs = statement.executeQuery();
+
+        Assert.assertTrue(rs.next());
+
+        String format_name = rs.getString(1);
+
+        Assert.assertTrue("FP-GTC".equals(format_name));
+
+        rs.close();
+        statement.close();
+        conn.close();
+
+    }
+
+    @Test
+    public void testResultSet() throws Exception {
+        String sql = "select LSTG_FORMAT_NAME, sum(price) as GMV, count(1) as 
TRANS_CNT from test_kylin_fact \n" + " group by LSTG_FORMAT_NAME ";
+
+        Connection conn = getConnection();
+        Statement statement = conn.createStatement();
+
+        statement.execute(sql);
+
+        ResultSet rs = statement.getResultSet();
+
+        int count = 0;
+        while (rs.next()) {
+            count++;
+            String lstg = rs.getString(1);
+            double gmv = rs.getDouble(2);
+            int trans_count = rs.getInt(3);
+
+            System.out.println("Get a line: LSTG_FORMAT_NAME=" + lstg + ", 
GMV=" + gmv + ", TRANS_CNT=" + trans_count);
+        }
+
+        Assert.assertTrue(count > 0);
+        statement.close();
+        rs.close();
+        conn.close();
+
+    }
+
+    private static class SystemPropertiesOverride {
+        HashMap<String, String> backup = new HashMap<String, String>();
+
+        public void override(String key, String value) {
+            backup.put(key, System.getProperty(key));
+            System.setProperty(key, value);
+        }
+
+        public void restore() {
+            for (String key : backup.keySet()) {
+                String value = backup.get(key);
+                if (value == null)
+                    System.clearProperty(key);
+                else
+                    System.setProperty(key, value);
+            }
+            backup.clear();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
new file mode 100644
index 0000000..c40b470
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.provision;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
+import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+
+import com.google.common.collect.Lists;
+
+public class BuildCubeWithEngine {
+
+    private CubeManager cubeManager;
+    private DefaultScheduler scheduler;
+    protected ExecutableManager jobService;
+    private static boolean fastBuildMode = false;
+
+    private static final Log logger = 
LogFactory.getLog(BuildCubeWithEngine.class);
+
+    public static void main(String[] args) throws Exception {
+        beforeClass();
+        BuildCubeWithEngine buildCubeWithEngine = new BuildCubeWithEngine();
+        buildCubeWithEngine.before();
+        buildCubeWithEngine.build();
+        afterClass();
+    }
+
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+
+        String fastModeStr = System.getProperty("fastBuildMode");
+        if (fastModeStr != null && fastModeStr.equalsIgnoreCase("true")) {
+            fastBuildMode = true;
+            logger.info("Will use fast build mode");
+        } else {
+            logger.info("Will not use fast build mode");
+        }
+
+        System.setProperty(KylinConfig.KYLIN_CONF, 
"../examples/test_case_data/sandbox");
+        if (System.getProperty("hdp.version") == null) {
+            throw new RuntimeException("No hdp.version set; Please set 
hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+        }
+
+        
HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+    }
+
+    public void before() throws Exception {
+
+        DeployUtil.initCliWorkDir();
+        DeployUtil.deployMetadata();
+        DeployUtil.overrideJobJarLocations();
+
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        jobService = ExecutableManager.getInstance(kylinConfig);
+        scheduler = DefaultScheduler.getInstance();
+        scheduler.init(new JobEngineConfig(kylinConfig), new 
ZookeeperJobLock());
+        if (!scheduler.hasStarted()) {
+            throw new RuntimeException("scheduler has not been started");
+        }
+        cubeManager = CubeManager.getInstance(kylinConfig);
+        for (String jobId : jobService.getAllJobIds()) {
+            if (jobService.getJob(jobId) instanceof CubingJob) {
+                jobService.deleteJob(jobId);
+            }
+        }
+
+    }
+
+    public static void afterClass() {
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
+    }
+
+    public void build() throws Exception {
+        
DeployUtil.prepareTestDataForNormalCubes("test_kylin_cube_with_slr_left_join_empty");
+        testInner();
+        testLeft();
+    }
+
+    protected void waitForJob(String jobId) {
+        while (true) {
+            AbstractExecutable job = jobService.getJob(jobId);
+            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() 
== ExecutableState.ERROR) {
+                break;
+            } else {
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void testInner() throws Exception {
+        String[] testCase = new String[] { "testInnerJoinCubeWithoutSlr", 
"testInnerJoinCubeWithSlr" };
+        runTestAndAssertSucceed(testCase);
+    }
+
+    private void testLeft() throws Exception {
+        String[] testCase = new String[] { "testLeftJoinCubeWithSlr", 
"testLeftJoinCubeWithoutSlr" };
+        runTestAndAssertSucceed(testCase);
+    }
+
+    private void runTestAndAssertSucceed(String[] testCase) throws Exception {
+        ExecutorService executorService = 
Executors.newFixedThreadPool(testCase.length);
+        final CountDownLatch countDownLatch = new 
CountDownLatch(testCase.length);
+        List<Future<List<String>>> tasks = 
Lists.newArrayListWithExpectedSize(testCase.length);
+        for (int i = 0; i < testCase.length; i++) {
+            tasks.add(executorService.submit(new TestCallable(testCase[i], 
countDownLatch)));
+        }
+        countDownLatch.await();
+        try {
+            for (int i = 0; i < tasks.size(); ++i) {
+                Future<List<String>> task = tasks.get(i);
+                final List<String> jobIds = task.get();
+                for (String jobId : jobIds) {
+                    assertJobSucceed(jobId);
+                }
+            }
+        } catch (Exception ex) {
+            logger.error(ex);
+            throw ex;
+        }
+    }
+
+    private void assertJobSucceed(String jobId) {
+        assertEquals("The job '" + jobId + "' is failed.", 
ExecutableState.SUCCEED, jobService.getOutput(jobId).getState());
+    }
+
+    private class TestCallable implements Callable<List<String>> {
+
+        private final String methodName;
+        private final CountDownLatch countDownLatch;
+
+        public TestCallable(String methodName, CountDownLatch countDownLatch) {
+            this.methodName = methodName;
+            this.countDownLatch = countDownLatch;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public List<String> call() throws Exception {
+            try {
+                final Method method = 
BuildCubeWithEngine.class.getDeclaredMethod(methodName);
+                method.setAccessible(true);
+                return (List<String>) method.invoke(BuildCubeWithEngine.this);
+            } catch (Exception e) {
+                logger.error(e.getMessage());
+                throw e;
+            } finally {
+                countDownLatch.countDown();
+            }
+        }
+    }
+
+    @SuppressWarnings("unused")
+    // called by reflection
+    private List<String> testInnerJoinCubeWithSlr() throws Exception {
+        clearSegment("test_kylin_cube_with_slr_empty");
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+        long date1 = 0;
+        long date2 = f.parse("2015-01-01").getTime();
+        long date3 = f.parse("2022-01-01").getTime();
+        List<String> result = Lists.newArrayList();
+
+        if (fastBuildMode) {
+            result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, 
date3));
+        } else {
+            result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, 
date2));
+            result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, 
date3));//empty segment
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unused")
+    // called by reflection
+    private List<String> testInnerJoinCubeWithoutSlr() throws Exception {
+
+        clearSegment("test_kylin_cube_without_slr_empty");
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+        long date1 = 0;
+        long date2 = f.parse("2013-01-01").getTime();
+        long date3 = f.parse("2013-07-01").getTime();
+        long date4 = f.parse("2022-01-01").getTime();
+        List<String> result = Lists.newArrayList();
+
+        if (fastBuildMode) {
+            result.add(buildSegment("test_kylin_cube_without_slr_empty", 
date1, date4));
+        } else {
+            result.add(buildSegment("test_kylin_cube_without_slr_empty", 
date1, date2));
+            result.add(buildSegment("test_kylin_cube_without_slr_empty", 
date2, date3));
+            result.add(buildSegment("test_kylin_cube_without_slr_empty", 
date3, date4));
+            result.add(mergeSegment("test_kylin_cube_without_slr_empty", 
date1, date3));//don't merge all segments
+        }
+        return result;
+
+    }
+
+    @SuppressWarnings("unused")
+    // called by reflection
+    private List<String> testLeftJoinCubeWithoutSlr() throws Exception {
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+        List<String> result = Lists.newArrayList();
+        final String cubeName = "test_kylin_cube_without_slr_left_join_empty";
+        clearSegment(cubeName);
+
+        long date1 = 
cubeManager.getCube(cubeName).getDescriptor().getPartitionDateStart();
+        long date2 = f.parse("2012-06-01").getTime();
+        long date3 = f.parse("2022-01-01").getTime();
+        long date4 = f.parse("2023-01-01").getTime();
+
+        if (fastBuildMode) {
+            
result.add(buildSegment("test_kylin_cube_without_slr_left_join_empty", date1, 
date4));
+        } else {
+            
result.add(buildSegment("test_kylin_cube_without_slr_left_join_empty", date1, 
date2));
+            
result.add(buildSegment("test_kylin_cube_without_slr_left_join_empty", date2, 
date3));
+            
result.add(buildSegment("test_kylin_cube_without_slr_left_join_empty", date3, 
date4));//empty segment
+            
result.add(mergeSegment("test_kylin_cube_without_slr_left_join_empty", date1, 
date3));//don't merge all segments
+        }
+
+        return result;
+
+    }
+
+    @SuppressWarnings("unused")
+    // called by reflection
+    private List<String> testLeftJoinCubeWithSlr() throws Exception {
+        String cubeName = "test_kylin_cube_with_slr_left_join_empty";
+        clearSegment(cubeName);
+
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+        long date1 = 
cubeManager.getCube(cubeName).getDescriptor().getPartitionDateStart();
+        long date2 = f.parse("2013-01-01").getTime();
+        long date3 = f.parse("2013-07-01").getTime();
+        long date4 = f.parse("2022-01-01").getTime();
+
+        List<String> result = Lists.newArrayList();
+        if (fastBuildMode) {
+            result.add(buildSegment(cubeName, date1, date4));
+        } else {
+            result.add(buildSegment(cubeName, date1, date2));
+            result.add(buildSegment(cubeName, date2, date3));
+            result.add(buildSegment(cubeName, date3, date4));
+            result.add(mergeSegment(cubeName, date1, date3));//don't merge all 
segments
+        }
+        return result;
+
+    }
+
+    private void clearSegment(String cubeName) throws Exception {
+        CubeInstance cube = cubeManager.getCube(cubeName);
+        // remove all existing segments
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new 
CubeSegment[cube.getSegments().size()]));
+        cubeManager.updateCube(cubeBuilder);
+    }
+
+    private String mergeSegment(String cubeName, long startDate, long endDate) 
throws Exception {
+        CubeSegment segment = 
cubeManager.mergeSegments(cubeManager.getCube(cubeName), startDate, endDate, 
true);
+        DefaultChainedExecutable job = 
EngineFactory.createBatchMergeJob(segment, "TEST");
+        jobService.addJob(job);
+        waitForJob(job.getId());
+        return job.getId();
+    }
+
+    private String buildSegment(String cubeName, long startDate, long endDate) 
throws Exception {
+        CubeSegment segment = 
cubeManager.appendSegments(cubeManager.getCube(cubeName), endDate);
+        DefaultChainedExecutable job = 
EngineFactory.createBatchCubingJob(segment, "TEST");
+        jobService.addJob(job);
+        waitForJob(job.getId());
+        return job.getId();
+    }
+
+    private int cleanupOldStorage() throws Exception {
+        String[] args = { "--delete", "true" };
+
+        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+        return exitCode;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
new file mode 100644
index 0000000..2a142fc
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.provision;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.spark.SparkBatchCubingEngine;
+import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.lock.MockJobLock;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+//TODO: convert it to a normal class rather than a test case, like in 
BuildCubeWithEngine 
+@Ignore
+public class BuildCubeWithSpark {
+
+    private CubeManager cubeManager;
+    private DefaultScheduler scheduler;
+    protected ExecutableManager jobService;
+
+    private static final Log logger = 
LogFactory.getLog(BuildCubeWithSpark.class);
+
+    protected void waitForJob(String jobId) {
+        while (true) {
+            AbstractExecutable job = jobService.getJob(jobId);
+            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() 
== ExecutableState.ERROR) {
+                break;
+            } else {
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty(KylinConfig.KYLIN_CONF, 
"../examples/test_case_data/sandbox");
+        if (System.getProperty("hdp.version") == null) {
+            throw new RuntimeException("No hdp.version set; Please set 
hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+        }
+
+    }
+
+    @Before
+    public void before() throws Exception {
+        
HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+        DeployUtil.initCliWorkDir();
+        DeployUtil.deployMetadata();
+        DeployUtil.overrideJobJarLocations();
+
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        jobService = ExecutableManager.getInstance(kylinConfig);
+        for (String jobId : jobService.getAllJobIds()) {
+            jobService.deleteJob(jobId);
+        }
+        scheduler = DefaultScheduler.getInstance();
+        scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
+        if (!scheduler.hasStarted()) {
+            throw new RuntimeException("scheduler has not been started");
+        }
+        cubeManager = CubeManager.getInstance(kylinConfig);
+
+    }
+
+    @After
+    public void after() {
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+        final CubeSegment segment = createSegment();
+        String confPath = new 
File(AbstractKylinTestCase.SANDBOX_TEST_DATA).getAbsolutePath();
+        KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar();
+        String coprocessor = 
KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar();
+        logger.info("confPath location:" + confPath);
+        logger.info("coprocessor location:" + coprocessor);
+        final DefaultChainedExecutable cubingJob = new 
SparkBatchCubingEngine(confPath, coprocessor).createBatchCubingJob(segment, 
"BuildCubeWithSpark");
+        jobService.addJob(cubingJob);
+        waitForJob(cubingJob.getId());
+        assertEquals(ExecutableState.SUCCEED, 
jobService.getOutput(cubingJob.getId()).getState());
+    }
+
+    private void clearSegment(String cubeName) throws Exception {
+        CubeInstance cube = cubeManager.getCube(cubeName);
+        // remove all existing segments
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new 
CubeSegment[cube.getSegments().size()]));
+        cubeManager.updateCube(cubeBuilder);
+    }
+
+    private CubeSegment createSegment() throws Exception {
+        String cubeName = "test_kylin_cube_with_slr_left_join_empty";
+        clearSegment(cubeName);
+
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+        long dateStart = 
cubeManager.getCube(cubeName).getDescriptor().getPartitionDateStart();
+        long dateEnd = f.parse("2050-11-12").getTime();
+
+        // this cube's start date is 0, end date is 20501112000000
+        List<String> result = Lists.newArrayList();
+        return cubeManager.appendSegments(cubeManager.getCube(cubeName), 
dateEnd);
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
new file mode 100644
index 0000000..290b869
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.provision;
+
+import java.io.File;
+import java.util.UUID;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  for streaming cubing case "test_streaming_table"
+ */
+public class BuildCubeWithStream {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BuildCubeWithStream.class);
+    private static final String streamingName = "test_streaming_table_cube";
+    private static final long startTime = 
DateFormat.stringToMillis("2015-01-01 00:00:00");
+    private static final long endTime = DateFormat.stringToMillis("2015-01-03 
00:00:00");
+    private static final long batchInterval = 16 * 60 * 60 * 1000;//16 hours
+
+    private KylinConfig kylinConfig;
+
+    public static void main(String[] args) throws Exception {
+        beforeClass();
+        BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream();
+        buildCubeWithStream.before();
+        buildCubeWithStream.build();
+        afterClass();
+    }
+
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty(KylinConfig.KYLIN_CONF, 
"../examples/test_case_data/sandbox");
+        if (System.getProperty("hdp.version") == null) {
+            throw new RuntimeException("No hdp.version set; Please set 
hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+        }
+        
HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+    }
+
+    public void before() throws Exception {
+        DeployUtil.overrideJobJarLocations();
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+
+        final StreamingConfig config = 
StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingName);
+
+        //Use a random topic for kafka data stream
+        KafkaConfig streamingConfig = 
KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingName);
+        streamingConfig.setTopic(UUID.randomUUID().toString());
+        
KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig);
+
+        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, 
config.getCubeName(), streamingConfig);
+    }
+
+    public static void afterClass() throws Exception {
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
+    }
+
+    public void build() throws Exception {
+        logger.info("start time:" + startTime + " end time:" + endTime + " 
batch interval:" + batchInterval + " batch count:" + ((endTime - startTime) / 
batchInterval));
+        for (long start = startTime; start < endTime; start += batchInterval) {
+            logger.info(String.format("build batch:{%d, %d}", start, start + 
batchInterval));
+            new OneOffStreamingBuilder(streamingName, start, start + 
batchInterval).build().run();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
new file mode 100644
index 0000000..b9c242d
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.provision;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.engine.mr.invertedindex.BatchIIJobBuilder;
+import org.apache.kylin.engine.mr.invertedindex.IIJob;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
+import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+//TODO: convert it to a normal class rather than a test case, like in 
BuildCubeWithEngine 
+@Ignore
+public class BuildIIWithEngine {
+
+    private JobEngineConfig jobEngineConfig;
+    private IIManager iiManager;
+
+    private DefaultScheduler scheduler;
+    protected ExecutableManager jobService;
+
+    protected static final String[] TEST_II_INSTANCES = new String[] { 
"test_kylin_ii_inner_join", "test_kylin_ii_left_join" };
+
+    private static final Log logger = 
LogFactory.getLog(BuildIIWithEngine.class);
+
+    protected void waitForJob(String jobId) {
+        while (true) {
+            AbstractExecutable job = jobService.getJob(jobId);
+            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() 
== ExecutableState.ERROR) {
+                break;
+            } else {
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        System.setProperty(KylinConfig.KYLIN_CONF, 
"../examples/test_case_data/sandbox");
+        if (System.getProperty("hdp.version") == null) {
+            throw new RuntimeException("No hdp.version set; Please set 
hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+        }
+    }
+
+    @Before
+    public void before() throws Exception {
+        
HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+
+        //DeployUtil.initCliWorkDir();
+        //        DeployUtil.deployMetadata();
+        DeployUtil.overrideJobJarLocations();
+
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        jobService = ExecutableManager.getInstance(kylinConfig);
+        scheduler = DefaultScheduler.getInstance();
+        scheduler.init(new JobEngineConfig(kylinConfig), new 
ZookeeperJobLock());
+        if (!scheduler.hasStarted()) {
+            throw new RuntimeException("scheduler has not been started");
+        }
+        jobEngineConfig = new JobEngineConfig(kylinConfig);
+        for (String jobId : jobService.getAllJobIds()) {
+            if (jobService.getJob(jobId) instanceof IIJob) {
+                jobService.deleteJob(jobId);
+            }
+        }
+
+        iiManager = IIManager.getInstance(kylinConfig);
+        for (String iiInstance : TEST_II_INSTANCES) {
+
+            IIInstance ii = iiManager.getII(iiInstance);
+            if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
+                ii.setStatus(RealizationStatusEnum.DISABLED);
+                iiManager.updateII(ii);
+            }
+        }
+    }
+
+    @After
+    public void after() throws Exception {
+
+        for (String iiInstance : TEST_II_INSTANCES) {
+            IIInstance ii = iiManager.getII(iiInstance);
+            if (ii.getStatus() != RealizationStatusEnum.READY) {
+                ii.setStatus(RealizationStatusEnum.READY);
+                iiManager.updateII(ii);
+            }
+        }
+    }
+
+    @Test
+    public void testBuildII() throws Exception {
+
+        String[] testCase = new String[] { "buildIIInnerJoin", 
"buildIILeftJoin" };
+        ExecutorService executorService = 
Executors.newFixedThreadPool(testCase.length);
+        final CountDownLatch countDownLatch = new 
CountDownLatch(testCase.length);
+        List<Future<List<String>>> tasks = 
Lists.newArrayListWithExpectedSize(testCase.length);
+        for (int i = 0; i < testCase.length; i++) {
+            tasks.add(executorService.submit(new TestCallable(testCase[i], 
countDownLatch)));
+        }
+        countDownLatch.await();
+        for (int i = 0; i < tasks.size(); ++i) {
+            Future<List<String>> task = tasks.get(i);
+            final List<String> jobIds = task.get();
+            for (String jobId : jobIds) {
+                assertJobSucceed(jobId);
+            }
+        }
+
+    }
+
+    private void assertJobSucceed(String jobId) {
+        assertEquals(ExecutableState.SUCCEED, 
jobService.getOutput(jobId).getState());
+    }
+
+    private class TestCallable implements Callable<List<String>> {
+
+        private final String methodName;
+        private final CountDownLatch countDownLatch;
+
+        public TestCallable(String methodName, CountDownLatch countDownLatch) {
+            this.methodName = methodName;
+            this.countDownLatch = countDownLatch;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public List<String> call() throws Exception {
+            try {
+                final Method method = 
BuildIIWithEngine.class.getDeclaredMethod(methodName);
+                method.setAccessible(true);
+                return (List<String>) method.invoke(BuildIIWithEngine.this);
+            } finally {
+                countDownLatch.countDown();
+            }
+        }
+    }
+
+    protected List<String> buildIIInnerJoin() throws Exception {
+        return buildII(TEST_II_INSTANCES[0]);
+    }
+
+    protected List<String> buildIILeftJoin() throws Exception {
+        return buildII(TEST_II_INSTANCES[1]);
+    }
+
+    protected List<String> buildII(String iiName) throws Exception {
+        clearSegment(iiName);
+
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+        long date1 = 0;
+        long date2 = f.parse("2015-01-01").getTime();
+
+        List<String> result = Lists.newArrayList();
+        result.add(buildSegment(iiName, date1, date2));
+        return result;
+    }
+
+    private void clearSegment(String iiName) throws Exception {
+        IIInstance ii = iiManager.getII(iiName);
+        ii.getSegments().clear();
+        iiManager.updateII(ii);
+    }
+
+    private String buildSegment(String iiName, long startDate, long endDate) 
throws Exception {
+        IIInstance iiInstance = iiManager.getII(iiName);
+        IISegment segment = iiManager.buildSegment(iiInstance, startDate, 
endDate);
+        iiInstance.getSegments().add(segment);
+        iiManager.updateII(iiInstance);
+
+        BatchIIJobBuilder batchIIJobBuilder = new BatchIIJobBuilder(segment, 
"SYSTEM");
+        IIJob job = batchIIJobBuilder.build();
+        jobService.addJob(job);
+        waitForJob(job.getId());
+        return job.getId();
+    }
+
+    private int cleanupOldStorage() throws Exception {
+        String[] args = { "--delete", "true" };
+
+        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+        return exitCode;
+    }
+
+    public static void main(String[] args) throws Exception {
+        BuildIIWithEngine instance = new BuildIIWithEngine();
+
+        BuildIIWithEngine.beforeClass();
+        instance.before();
+        instance.testBuildII();
+        instance.after();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
new file mode 100644
index 0000000..4552cf0
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
@@ -0,0 +1,294 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.provision;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.UUID;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.SliceBuilder;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.common.ShellExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.source.hive.HiveTableReader;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.ii.IICreateHTableJob;
+import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class BuildIIWithStream {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BuildIIWithStream.class);
+
+    private static final String[] II_NAME = new String[] { 
"test_kylin_ii_left_join", "test_kylin_ii_inner_join" };
+    private IIManager iiManager;
+    private KylinConfig kylinConfig;
+
+    public static void main(String[] args) throws Exception {
+        beforeClass();
+        BuildIIWithStream buildCubeWithEngine = new BuildIIWithStream();
+        buildCubeWithEngine.before();
+        buildCubeWithEngine.build();
+        afterClass();
+    }
+
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        if (System.getProperty("hdp.version") == null) {
+            throw new RuntimeException("No hdp.version set; Please set 
hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+        }
+        
HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+    }
+
+    public void before() throws Exception {
+        DeployUtil.overrideJobJarLocations();
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        iiManager = IIManager.getInstance(kylinConfig);
+        for (String iiInstance : II_NAME) {
+
+            IIInstance ii = iiManager.getII(iiInstance);
+            if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
+                ii.setStatus(RealizationStatusEnum.DISABLED);
+                iiManager.updateII(ii);
+            }
+        }
+    }
+
+    public static void afterClass() throws Exception {
+        cleanupOldStorage();
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
+    }
+
+    private String createIntermediateTable(IIDesc desc, KylinConfig 
kylinConfig) throws IOException {
+        IIJoinedFlatTableDesc intermediateTableDesc = new 
IIJoinedFlatTableDesc(desc);
+        JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
+        final String uuid = UUID.randomUUID().toString();
+        final String useDatabaseHql = "USE " + 
kylinConfig.getHiveDatabaseForIntermediateTable() + ";";
+        final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
+        final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, 
JobBuilderSupport.getJobWorkingDir(jobEngineConfig, uuid));
+        String insertDataHqls;
+        try {
+            insertDataHqls = 
JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, 
jobEngineConfig);
+        } catch (IOException e1) {
+            e1.printStackTrace();
+            throw new RuntimeException("Failed to generate insert data SQL for 
intermediate table.");
+        }
+
+        ShellExecutable step = new ShellExecutable();
+        StringBuffer buf = new StringBuffer();
+        buf.append("hive -e \"");
+        buf.append(useDatabaseHql + "\n");
+        buf.append(dropTableHql + "\n");
+        buf.append(createTableHql + "\n");
+        buf.append(insertDataHqls + "\n");
+        buf.append("\"");
+
+        step.setCmd(buf.toString());
+        logger.info(step.getCmd());
+        step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+        kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null);
+        return intermediateTableDesc.getTableName();
+    }
+
+    private void clearSegment(String iiName) throws Exception {
+        IIInstance ii = iiManager.getII(iiName);
+        ii.getSegments().clear();
+        iiManager.updateII(ii);
+    }
+
+    private IISegment createSegment(String iiName) throws Exception {
+        clearSegment(iiName);
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+        long date1 = 0;
+        long date2 = f.parse("2015-01-01").getTime();
+        return buildSegment(iiName, date1, date2);
+    }
+
+    private IISegment buildSegment(String iiName, long startDate, long 
endDate) throws Exception {
+        IIInstance iiInstance = iiManager.getII(iiName);
+        IISegment segment = iiManager.buildSegment(iiInstance, startDate, 
endDate);
+        iiInstance.getSegments().add(segment);
+        iiManager.updateII(iiInstance);
+        return segment;
+    }
+
+    private void buildII(String iiName) throws Exception {
+        final IIDesc desc = iiManager.getII(iiName).getDescriptor();
+        final String tableName = createIntermediateTable(desc, kylinConfig);
+        logger.info("intermediate table name:" + tableName);
+
+        HiveTableReader reader = new HiveTableReader("default", tableName);
+        final List<TblColRef> tblColRefs = desc.listAllColumns();
+        for (TblColRef tblColRef : tblColRefs) {
+            if (desc.isMetricsCol(tblColRef)) {
+                logger.info("matrix:" + tblColRef.getName());
+            } else {
+                logger.info("measure:" + tblColRef.getName());
+            }
+        }
+        final IISegment segment = createSegment(iiName);
+        final HTableInterface htable = 
HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(segment.getStorageLocationIdentifier());
+        String[] args = new String[] { "-iiname", iiName, "-htablename", 
segment.getStorageLocationIdentifier() };
+        ToolRunner.run(new IICreateHTableJob(), args);
+
+        final IIDesc iiDesc = segment.getIIDesc();
+        final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0);
+
+        List<String[]> sorted = getSortedRows(reader, 
desc.getTimestampColumn());
+        int count = sorted.size();
+        ArrayList<StreamingMessage> messages = Lists.newArrayList();
+        for (String[] row : sorted) {
+            messages.add((parse(row)));
+            if (messages.size() >= iiDesc.getSliceSize()) {
+                build(sliceBuilder, new StreamingBatch(messages, 
Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
+                messages.clear();
+            }
+        }
+
+        if (!messages.isEmpty()) {
+            build(sliceBuilder, new StreamingBatch(messages, 
Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
+        }
+
+        reader.close();
+        logger.info("total record count:" + count + " htable:" + 
segment.getStorageLocationIdentifier());
+        logger.info("stream build finished, htable name:" + 
segment.getStorageLocationIdentifier());
+    }
+
+    public void build() throws Exception {
+        for (String iiName : II_NAME) {
+            buildII(iiName);
+            IIInstance ii = iiManager.getII(iiName);
+            if (ii.getStatus() != RealizationStatusEnum.READY) {
+                ii.setStatus(RealizationStatusEnum.READY);
+                iiManager.updateII(ii);
+            }
+        }
+    }
+
+    private void build(SliceBuilder sliceBuilder, StreamingBatch batch, 
HTableInterface htable) throws IOException {
+        final Slice slice = sliceBuilder.buildSlice(batch);
+        try {
+            loadToHBase(htable, slice, new IIKeyValueCodec(slice.getInfo()));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void loadToHBase(HTableInterface hTable, Slice slice, 
IIKeyValueCodec codec) throws IOException {
+        List<Put> data = Lists.newArrayList();
+        for (IIRow row : codec.encodeKeyValue(slice)) {
+            final byte[] key = row.getKey().get();
+            final byte[] value = row.getValue().get();
+            Put put = new Put(key);
+            put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, 
value);
+            final ImmutableBytesWritable dictionary = row.getDictionary();
+            final byte[] dictBytes = dictionary.get();
+            if (dictionary.getOffset() == 0 && dictionary.getLength() == 
dictBytes.length) {
+                put.add(IIDesc.HBASE_FAMILY_BYTES, 
IIDesc.HBASE_DICTIONARY_BYTES, dictBytes);
+            } else {
+                throw new RuntimeException("dict offset should be 0, and dict 
length should be " + dictBytes.length + " but they are" + 
dictionary.getOffset() + " " + dictionary.getLength());
+            }
+            data.add(put);
+        }
+        hTable.put(data);
+        //omit hTable.flushCommits(), because htable is auto flush
+    }
+
+    private StreamingMessage parse(String[] row) {
+        return new StreamingMessage(Lists.newArrayList(row), 
System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, 
Object> emptyMap());
+    }
+
+    private List<String[]> getSortedRows(HiveTableReader reader, final int 
tsCol) throws IOException {
+        List<String[]> unsorted = Lists.newArrayList();
+        while (reader.next()) {
+            unsorted.add(reader.getRow());
+        }
+        Collections.sort(unsorted, new Comparator<String[]>() {
+            @Override
+            public int compare(String[] o1, String[] o2) {
+                long t1 = DateFormat.stringToMillis(o1[tsCol]);
+                long t2 = DateFormat.stringToMillis(o2[tsCol]);
+                return Long.compare(t1, t2);
+            }
+        });
+        return unsorted;
+    }
+
+    private static int cleanupOldStorage() throws Exception {
+        String[] args = { "--delete", "true" };
+
+        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+        return exitCode;
+    }
+
+}

Reply via email to