http://git-wip-us.apache.org/repos/asf/kylin/blob/1428bbc4/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
new file mode 100644
index 0000000..8436687
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java
@@ -0,0 +1,298 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.provision;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.UUID;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.SliceBuilder;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.common.ShellExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.source.hive.HiveTableReader;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.ii.IICreateHTableJob;
+import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class BuildIIWithStream {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BuildIIWithStream.class);
+
+    private static final String[] II_NAME = new String[] { 
"test_kylin_ii_left_join", "test_kylin_ii_inner_join" };
+    private IIManager iiManager;
+    private KylinConfig kylinConfig;
+
+    public static void main(String[] args) throws Exception {
+        beforeClass();
+        BuildIIWithStream buildCubeWithEngine = new BuildIIWithStream();
+        buildCubeWithEngine.before();
+        buildCubeWithEngine.build();
+        logger.info("Build is done");
+        afterClass();
+        logger.info("Going to exit");
+        System.exit(0);
+    }
+
+    public static void beforeClass() throws Exception {
+        logger.info("Adding to classpath: " + new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        ClassUtil.addClasspath(new 
File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
+        if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
+            throw new RuntimeException("No hdp.version set; Please set 
hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+        }
+        
HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
+    }
+
+    public void before() throws Exception {
+        DeployUtil.overrideJobJarLocations();
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        iiManager = IIManager.getInstance(kylinConfig);
+        for (String iiInstance : II_NAME) {
+
+            IIInstance ii = iiManager.getII(iiInstance);
+            if (ii.getStatus() != RealizationStatusEnum.DISABLED) {
+                ii.setStatus(RealizationStatusEnum.DISABLED);
+                iiManager.updateII(ii);
+            }
+        }
+    }
+
+    public static void afterClass() throws Exception {
+        cleanupOldStorage();
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
+    }
+
+    private String createIntermediateTable(IIDesc desc, KylinConfig 
kylinConfig) throws IOException {
+        IIJoinedFlatTableDesc intermediateTableDesc = new 
IIJoinedFlatTableDesc(desc);
+        JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
+        final String uuid = UUID.randomUUID().toString();
+        final String useDatabaseHql = "USE " + 
kylinConfig.getHiveDatabaseForIntermediateTable() + ";";
+        final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
+        final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, 
JobBuilderSupport.getJobWorkingDir(jobEngineConfig, uuid));
+        String insertDataHqls;
+        try {
+            insertDataHqls = 
JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, 
jobEngineConfig);
+        } catch (IOException e1) {
+            e1.printStackTrace();
+            throw new RuntimeException("Failed to generate insert data SQL for 
intermediate table.");
+        }
+
+        ShellExecutable step = new ShellExecutable();
+        StringBuffer buf = new StringBuffer();
+        buf.append("hive -e \"");
+        buf.append(useDatabaseHql + "\n");
+        buf.append(dropTableHql + "\n");
+        buf.append(createTableHql + "\n");
+        buf.append(insertDataHqls + "\n");
+        buf.append("\"");
+
+        step.setCmd(buf.toString());
+        logger.info(step.getCmd());
+        step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+        kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null);
+        return intermediateTableDesc.getTableName();
+    }
+
+    private void clearSegment(String iiName) throws Exception {
+        IIInstance ii = iiManager.getII(iiName);
+        ii.getSegments().clear();
+        iiManager.updateII(ii);
+    }
+
+    private IISegment createSegment(String iiName) throws Exception {
+        clearSegment(iiName);
+        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
+        f.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+        long date1 = 0;
+        long date2 = f.parse("2015-01-01").getTime();
+        return buildSegment(iiName, date1, date2);
+    }
+
+    private IISegment buildSegment(String iiName, long startDate, long 
endDate) throws Exception {
+        IIInstance iiInstance = iiManager.getII(iiName);
+        IISegment segment = iiManager.buildSegment(iiInstance, startDate, 
endDate);
+        iiInstance.getSegments().add(segment);
+        iiManager.updateII(iiInstance);
+        return segment;
+    }
+
+    private void buildII(String iiName) throws Exception {
+        final IIDesc desc = iiManager.getII(iiName).getDescriptor();
+        final String tableName = createIntermediateTable(desc, kylinConfig);
+        logger.info("intermediate table name:" + tableName);
+
+        HiveTableReader reader = new HiveTableReader("default", tableName);
+        final List<TblColRef> tblColRefs = desc.listAllColumns();
+        for (TblColRef tblColRef : tblColRefs) {
+            if (desc.isMetricsCol(tblColRef)) {
+                logger.info("matrix:" + tblColRef.getName());
+            } else {
+                logger.info("measure:" + tblColRef.getName());
+            }
+        }
+        final IISegment segment = createSegment(iiName);
+        final HTableInterface htable = 
HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(segment.getStorageLocationIdentifier());
+        String[] args = new String[] { "-iiname", iiName, "-htablename", 
segment.getStorageLocationIdentifier() };
+        ToolRunner.run(new IICreateHTableJob(), args);
+
+        final IIDesc iiDesc = segment.getIIDesc();
+        final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0);
+
+        List<String[]> sorted = getSortedRows(reader, 
desc.getTimestampColumn());
+        int count = sorted.size();
+        ArrayList<StreamingMessage> messages = Lists.newArrayList();
+        for (String[] row : sorted) {
+            messages.add((parse(row)));
+            if (messages.size() >= iiDesc.getSliceSize()) {
+                build(sliceBuilder, new StreamingBatch(messages, 
Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
+                messages.clear();
+            }
+        }
+
+        if (!messages.isEmpty()) {
+            build(sliceBuilder, new StreamingBatch(messages, 
Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
+        }
+
+        reader.close();
+        logger.info("total record count:" + count + " htable:" + 
segment.getStorageLocationIdentifier());
+        logger.info("stream build finished, htable name:" + 
segment.getStorageLocationIdentifier());
+    }
+
+    public void build() throws Exception {
+        for (String iiName : II_NAME) {
+            buildII(iiName);
+            IIInstance ii = iiManager.getII(iiName);
+            if (ii.getStatus() != RealizationStatusEnum.READY) {
+                ii.setStatus(RealizationStatusEnum.READY);
+                iiManager.updateII(ii);
+            }
+        }
+    }
+
+    private void build(SliceBuilder sliceBuilder, StreamingBatch batch, 
HTableInterface htable) throws IOException {
+        final Slice slice = sliceBuilder.buildSlice(batch);
+        try {
+            loadToHBase(htable, slice, new IIKeyValueCodec(slice.getInfo()));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void loadToHBase(HTableInterface hTable, Slice slice, 
IIKeyValueCodec codec) throws IOException {
+        List<Put> data = Lists.newArrayList();
+        for (IIRow row : codec.encodeKeyValue(slice)) {
+            final byte[] key = row.getKey().get();
+            final byte[] value = row.getValue().get();
+            Put put = new Put(key);
+            put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, 
value);
+            final ImmutableBytesWritable dictionary = row.getDictionary();
+            final byte[] dictBytes = dictionary.get();
+            if (dictionary.getOffset() == 0 && dictionary.getLength() == 
dictBytes.length) {
+                put.add(IIDesc.HBASE_FAMILY_BYTES, 
IIDesc.HBASE_DICTIONARY_BYTES, dictBytes);
+            } else {
+                throw new RuntimeException("dict offset should be 0, and dict 
length should be " + dictBytes.length + " but they are" + 
dictionary.getOffset() + " " + dictionary.getLength());
+            }
+            data.add(put);
+        }
+        hTable.put(data);
+        //omit hTable.flushCommits(), because htable is auto flush
+    }
+
+    private StreamingMessage parse(String[] row) {
+        return new StreamingMessage(Lists.newArrayList(row), 
System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, 
Object> emptyMap());
+    }
+
+    private List<String[]> getSortedRows(HiveTableReader reader, final int 
tsCol) throws IOException {
+        List<String[]> unsorted = Lists.newArrayList();
+        while (reader.next()) {
+            unsorted.add(reader.getRow());
+        }
+        Collections.sort(unsorted, new Comparator<String[]>() {
+            @Override
+            public int compare(String[] o1, String[] o2) {
+                long t1 = DateFormat.stringToMillis(o1[tsCol]);
+                long t2 = DateFormat.stringToMillis(o2[tsCol]);
+                return Long.compare(t1, t2);
+            }
+        });
+        return unsorted;
+    }
+
+    private static int cleanupOldStorage() throws Exception {
+        String[] args = { "--delete", "true" };
+
+        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+        return exitCode;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1428bbc4/kylin-it/src/test/java/org/apache/kylin/query/H2Database.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/H2Database.java 
b/kylin-it/src/test/java/org/apache/kylin/query/H2Database.java
new file mode 100644
index 0000000..7cf072f
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/query/H2Database.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class H2Database {
+    @SuppressWarnings("unused")
+    private static final Logger logger = 
LoggerFactory.getLogger(H2Database.class);
+
+    private static final String[] ALL_TABLES = new String[] { 
"edw.test_cal_dt", "default.test_category_groupings", 
"default.test_kylin_fact", "edw.test_seller_type_dim", "edw.test_sites", 
"default.streaming_table" };
+    private static final Map<String, String> javaToH2DataTypeMapping = new 
HashMap<String, String>();
+
+    static {
+        javaToH2DataTypeMapping.put("short", "smallint");
+        javaToH2DataTypeMapping.put("long", "bigint");
+        javaToH2DataTypeMapping.put("byte", "tinyint");
+        javaToH2DataTypeMapping.put("string", "varchar");
+    }
+
+    private final Connection h2Connection;
+
+    private final KylinConfig config;
+
+    public H2Database(Connection h2Connection, KylinConfig config) {
+        this.h2Connection = h2Connection;
+        this.config = config;
+    }
+
+    public void loadAllTables() throws SQLException {
+        for (String tableName : ALL_TABLES) {
+            loadH2Table(tableName);
+        }
+    }
+
+    private void loadH2Table(String tableName) throws SQLException {
+        MetadataManager metaMgr = MetadataManager.getInstance(config);
+        TableDesc tableDesc = metaMgr.getTableDesc(tableName.toUpperCase());
+        File tempFile = null;
+
+        try {
+            tempFile = File.createTempFile("tmp_h2", ".csv");
+            FileOutputStream tempFileStream = new FileOutputStream(tempFile);
+            String normalPath = "/data/" + tableDesc.getIdentity() + ".csv";
+            InputStream csvStream = 
metaMgr.getStore().getResource(normalPath).inputStream;
+
+            org.apache.commons.io.IOUtils.copy(csvStream, tempFileStream);
+
+            csvStream.close();
+            tempFileStream.close();
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        String cvsFilePath = tempFile.getPath();
+        Statement stmt = h2Connection.createStatement();
+
+        String createDBSql = "CREATE SCHEMA IF NOT EXISTS DEFAULT;\nCREATE 
SCHEMA IF NOT EXISTS EDW;\nSET SCHEMA DEFAULT;\n";
+        stmt.executeUpdate(createDBSql);
+
+        String sql = generateCreateH2TableSql(tableDesc, cvsFilePath);
+        stmt.executeUpdate(sql);
+
+        if (tempFile != null)
+            tempFile.delete();
+    }
+
+    private String generateCreateH2TableSql(TableDesc tableDesc, String 
csvFilePath) {
+        StringBuilder ddl = new StringBuilder();
+        StringBuilder csvColumns = new StringBuilder();
+
+        ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n");
+        ddl.append("(" + "\n");
+
+        for (int i = 0; i < tableDesc.getColumns().length; i++) {
+            ColumnDesc col = tableDesc.getColumns()[i];
+            if (i > 0) {
+                ddl.append(",");
+                csvColumns.append(",");
+            }
+            ddl.append(col.getName() + " " + 
getH2DataType((col.getDatatype())) + "\n");
+            csvColumns.append(col.getName());
+        }
+        ddl.append(")" + "\n");
+        ddl.append("AS SELECT * FROM CSVREAD('" + csvFilePath + "', '" + 
csvColumns + "', 'charset=UTF-8 fieldSeparator=,');");
+
+        return ddl.toString();
+    }
+
+    private static String getH2DataType(String javaDataType) {
+        String hiveDataType = 
javaToH2DataTypeMapping.get(javaDataType.toLowerCase());
+        if (hiveDataType == null) {
+            hiveDataType = javaDataType;
+        }
+        return hiveDataType.toLowerCase();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1428bbc4/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java 
b/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java
new file mode 100644
index 0000000..f4bbe5f
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.kylin.storage.hbase.HBaseStorage;
+import 
org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ */
+@RunWith(Parameterized.class)
+public class ITCombinationTest extends ITKylinQueryTest {
+
+    @BeforeClass
+    public static void setUp() throws SQLException {
+        System.out.println("setUp in ITCombinationTest");
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        clean();
+        HBaseStorage.overwriteStorageQuery = null;
+    }
+
+    /**
+     * return all config combinations, where first setting specifies join type
+     * (inner or left), and the second setting specifies whether to force using
+     * coprocessors(on, off or unset).
+     */
+    @Parameterized.Parameters
+    public static Collection<Object[]> configs() {
+        //       return Arrays.asList(new Object[][] { { "inner", "unset" }, { 
"left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { 
"left", "on" }, });
+        return Arrays.asList(new Object[][] { { "inner", "on", "v2" }, { 
"left", "on", "v1" }, { "left", "on", "v2" } });
+    }
+
+    public ITCombinationTest(String joinType, String coprocessorToggle, String 
queryEngine) throws Exception {
+
+        ITKylinQueryTest.clean();
+
+        ITKylinQueryTest.joinType = joinType;
+        ITKylinQueryTest.setupAll();
+
+        if (coprocessorToggle.equals("on")) {
+            ObserverEnabler.forceCoprocessorOn();
+        } else if (coprocessorToggle.equals("off")) {
+            ObserverEnabler.forceCoprocessorOff();
+        } else if (coprocessorToggle.equals("unset")) {
+            // unset
+        }
+
+        if ("v1".equalsIgnoreCase(queryEngine)) {
+            HBaseStorage.overwriteStorageQuery = 
HBaseStorage.v1CubeStorageQuery;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1428bbc4/kylin-it/src/test/java/org/apache/kylin/query/ITIIQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITIIQueryTest.java 
b/kylin-it/src/test/java/org/apache/kylin/query/ITIIQueryTest.java
new file mode 100644
index 0000000..b914dd0
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITIIQueryTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.query.routing.Candidate;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.google.common.collect.Maps;
+
+/**
+ */
+@RunWith(Parameterized.class)
+public class ITIIQueryTest extends ITKylinQueryTest {
+    @BeforeClass
+    public static void setUp() throws Exception {
+
+        // give II higher priority than other realizations
+        Map<RealizationType, Integer> priorities = Maps.newHashMap();
+        priorities.put(RealizationType.INVERTED_INDEX, 0);
+        priorities.put(RealizationType.CUBE, 1);
+        priorities.put(RealizationType.HYBRID, 1);
+        Candidate.setPriorities(priorities);
+
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        ITKylinQueryTest.tearDown();//invoke super class
+
+        Map<RealizationType, Integer> priorities = Maps.newHashMap();
+        priorities.put(RealizationType.INVERTED_INDEX, 1);
+        priorities.put(RealizationType.CUBE, 0);
+        priorities.put(RealizationType.HYBRID, 0);
+        Candidate.setPriorities(priorities);
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] { { "inner" }, { "left" } });
+    }
+
+    public ITIIQueryTest(String joinType) throws Exception {
+
+        ITKylinQueryTest.clean();
+
+        ITKylinQueryTest.joinType = joinType;
+        ITKylinQueryTest.setupAll();
+
+    }
+
+    @Test
+    public void testSingleRunQuery() throws Exception {
+        super.testSingleRunQuery();
+    }
+
+    @Test
+    public void testDetailedQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_ii", null, true);
+    }
+
+    @Override
+    @Test
+    @Ignore("Skip Precisely Distinct Count Queries for II")
+    public void testPreciselyDistinctCountQuery() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1428bbc4/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java 
b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
new file mode 100644
index 0000000..ace861a
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.sql.DriverManager;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.query.enumerator.OLAPQuery;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.schema.OLAPSchemaFactory;
+import 
org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.dbunit.database.DatabaseConnection;
+import org.dbunit.database.IDatabaseConnection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("KylinQueryTest is contained by ITCombinationTest")
+public class ITKylinQueryTest extends KylinTestBase {
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        printInfo("setUp in KylinQueryTest");
+        joinType = "left";
+
+        setupAll();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        printInfo("tearDown");
+        printInfo("Closing connection...");
+        clean();
+    }
+
+    protected static void setupAll() throws Exception {
+        //setup env
+        HBaseMetadataTestCase.staticCreateTestMetadata();
+        config = KylinConfig.getInstanceFromEnv();
+
+        //setup cube conn
+        File olapTmp = 
OLAPSchemaFactory.createTempOLAPJson(ProjectInstance.DEFAULT_PROJECT_NAME, 
config);
+        Properties props = new Properties();
+        props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "10000");
+        cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + 
olapTmp.getAbsolutePath(), props);
+
+        //setup h2
+        h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + 
(h2InstanceCount++), "sa", "");
+        // Load H2 Tables (inner join)
+        H2Database h2DB = new H2Database(h2Connection, config);
+        h2DB.loadAllTables();
+    }
+
+    protected static void clean() {
+        if (cubeConnection != null)
+            closeConnection(cubeConnection);
+        if (h2Connection != null)
+            closeConnection(h2Connection);
+
+        ObserverEnabler.forceCoprocessorUnset();
+        HBaseMetadataTestCase.staticCleanupTestMetadata();
+    }
+
+    @Ignore("this is only for debug")
+    @Test
+    public void testTempQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/temp", null, true);
+    }
+
+    @Test
+    public void testSingleRunQuery() throws Exception {
+
+        String queryFileName = 
"src/test/resources/query/sql_tableau/query20.sql";
+
+        File sqlFile = new File(queryFileName);
+        if (sqlFile.exists()) {
+            runSQL(sqlFile, true, true);
+            runSQL(sqlFile, true, false);
+        }
+    }
+
+    @Test
+    public void testSingleExecuteQuery() throws Exception {
+
+        String queryFileName = 
"src/test/resources/query/sql_tableau/query20.sql";
+
+        File sqlFile = new File(queryFileName);
+        String sql = getTextFromFile(sqlFile);
+        IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
+
+        executeQuery(kylinConn, queryFileName, sql, true);
+    }
+
+    @Ignore
+    @Test
+    public void testTableauProbing() throws Exception {
+        batchExecuteQuery("src/test/resources/query/tableau_probing");
+    }
+
+    @Test
+    public void testCommonQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql", null, true);
+    }
+
+    @Test
+    public void testVerifyQuery() throws Exception {
+        verifyResultRowCount("src/test/resources/query/sql_verifyCount");
+    }
+
+    @Test
+    public void testOrderByQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_orderby", null, true);
+        // FIXME
+        // as of optiq 0.8, we lost metadata type with "order by" clause, e.g. 
sql_orderby/query01.sql
+        // thus, temporarily the "order by" clause was cross out, and the 
needSort is set to true
+        // execAndCompQuery("src/test/resources/query/sql_orderby", null, 
false);
+    }
+
+    @Test
+    public void testLookupQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_lookup", null, true);
+    }
+
+    @Test
+    public void testCachedQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_cache", null, true);
+    }
+
+    @Test
+    public void testDerivedColumnQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_derived", null, true);
+    }
+
+    @Test
+    public void testDistinctCountQuery() throws Exception {
+        if ("left".equalsIgnoreCase(joinType)) {
+            batchExecuteQuery("src/test/resources/query/sql_distinct");
+        }
+    }
+
+    @Test
+    public void testPreciselyDistinctCountQuery() throws Exception {
+        if ("left".equalsIgnoreCase(joinType)) {
+            
execAndCompQuery("src/test/resources/query/sql_distinct_precisely", null, true);
+        }
+    }
+
+    @Test
+    public void testStreamingTableQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_streaming", null, true);
+    }
+
+    @Test
+    public void testTableauQuery() throws Exception {
+        execAndCompResultSize("src/test/resources/query/sql_tableau", null, 
true);
+    }
+
+    @Test
+    public void testSubQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_subquery", null, true);
+    }
+
+    @Test
+    public void testCaseWhen() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_casewhen", null, true);
+    }
+
+    @Ignore
+    @Test
+    public void testHiveQuery() throws Exception {
+        execAndCompQuery("src/test/resources/query/sql_hive", null, true);
+    }
+
+    @Test
+    public void testH2Query() throws Exception {
+        this.execQueryUsingH2("src/test/resources/query/sql_orderby", false);
+    }
+
+    @Test
+    public void testInvalidQuery() throws Exception {
+
+        printInfo("-------------------- Test Invalid Query 
--------------------");
+        String queryFolder = "src/test/resources/query/sql_invalid";
+        List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), 
".sql");
+        for (File sqlFile : sqlFiles) {
+            String queryName = StringUtils.split(sqlFile.getName(), '.')[0];
+            printInfo("Testing Query " + queryName);
+            String sql = getTextFromFile(sqlFile);
+            IDatabaseConnection cubeConn = new 
DatabaseConnection(cubeConnection);
+            try {
+                cubeConn.createQueryTable(queryName, sql);
+            } catch (Throwable t) {
+                continue;
+            } finally {
+                cubeConn.close();
+            }
+            throw new IllegalStateException(queryName + " should be error!");
+        }
+    }
+
+    @Test
+    public void testDynamicQuery() throws Exception {
+        execAndCompDynamicQuery("src/test/resources/query/sql_dynamic", null, 
true);
+    }
+
+    @Ignore("simple query will be supported by ii")
+    @Test
+    public void testLimitEnabled() throws Exception {
+        runSqlFile("src/test/resources/query/sql_optimize/enable-limit01.sql");
+        assertLimitWasEnabled();
+    }
+
+    @Test
+    public void testTopNQuery() throws Exception {
+            if ("left".equalsIgnoreCase(joinType)) {
+            this.execAndCompQuery("src/test/resources/query/sql_topn", null, 
true);
+        }
+    }
+
+    private void assertLimitWasEnabled() {
+        OLAPContext context = getFirstOLAPContext();
+        assertTrue(context.storageContext.isLimitEnabled());
+    }
+
+    private OLAPContext getFirstOLAPContext() {
+        return OLAPContext.getThreadLocalContexts().iterator().next();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1428bbc4/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java 
b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
new file mode 100644
index 0000000..b5c6d10
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.LogManager;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.dbunit.Assertion;
+import org.dbunit.database.DatabaseConfig;
+import org.dbunit.database.DatabaseConnection;
+import org.dbunit.database.IDatabaseConnection;
+import org.dbunit.dataset.DataSetException;
+import org.dbunit.dataset.ITable;
+import org.dbunit.dataset.SortedTable;
+import org.dbunit.dataset.datatype.DataType;
+import org.dbunit.dataset.datatype.DataTypeException;
+import org.dbunit.ext.h2.H2Connection;
+import org.dbunit.ext.h2.H2DataTypeFactory;
+import org.junit.Assert;
+
+import com.google.common.io.Files;
+
+/**
+ */
+public class KylinTestBase {
+
+    // Hack for the different constant integer type between optiq (INTEGER) and
+    // h2 (BIGINT)
+    public static class TestH2DataTypeFactory extends H2DataTypeFactory {
+        @Override
+        public DataType createDataType(int sqlType, String sqlTypeName, String 
tableName, String columnName) throws DataTypeException {
+
+            if ((columnName.startsWith("COL") || columnName.startsWith("col")) 
&& sqlType == Types.BIGINT) {
+                return DataType.INTEGER;
+            }
+            return super.createDataType(sqlType, sqlTypeName);
+        }
+    }
+
+    protected static final String resultTableName = "query result of ";
+    protected static KylinConfig config = null;
+    protected static Connection cubeConnection = null;
+    protected static Connection h2Connection = null;
+    protected static String joinType = "default";
+    protected static int h2InstanceCount = 0;
+
+    protected static int compQueryCount = 0;
+    protected static ArrayList<String> zeroResultQueries = new 
ArrayList<String>();
+
+    protected static void closeConnection(Connection connection) {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    /**
+     * @param folder
+     * @param fileType
+     *            specify the interested file type by file extension
+     * @return
+     */
+    protected static List<File> getFilesFromFolder(final File folder, final 
String fileType) {
+        List<File> files = new ArrayList<File>();
+        for (final File fileEntry : folder.listFiles()) {
+            if 
(fileEntry.getName().toLowerCase().endsWith(fileType.toLowerCase())) {
+                files.add(fileEntry);
+            }
+        }
+        return files;
+    }
+
+    protected static void getFilesFromFolderR(final String directoryStr, 
List<File> files, final String fileType) {
+        File folder = new File(directoryStr);
+        for (final File fileEntry : folder.listFiles()) {
+            if (fileEntry.isDirectory()) {
+                getFilesFromFolderR(fileEntry.getAbsolutePath(), files, 
fileType);
+            } else if (fileEntry.isFile()) {
+                if 
(fileEntry.getName().toLowerCase().endsWith(fileType.toLowerCase())) {
+                    files.add(fileEntry);
+                }
+            }
+        }
+    }
+
+    protected static void putTextTofile(File file, String sql) throws 
IOException {
+        BufferedWriter writer = new BufferedWriter(new FileWriter(file));
+        writer.write(sql, 0, sql.length());
+        writer.close();
+    }
+
+    protected static String getTextFromFile(File file) throws IOException {
+        BufferedReader reader = new BufferedReader(new FileReader(file));
+        String line = null;
+        StringBuilder stringBuilder = new StringBuilder();
+        String ls = System.getProperty("line.separator");
+        while ((line = reader.readLine()) != null) {
+            stringBuilder.append(line);
+            stringBuilder.append(ls);
+        }
+        reader.close();
+        return stringBuilder.toString();
+    }
+
+    protected static List<String> getParameterFromFile(File sqlFile) throws 
IOException {
+        String sqlFileName = sqlFile.getAbsolutePath();
+        int prefixIndex = sqlFileName.lastIndexOf(".sql");
+        String dataFielName = sqlFileName.substring(0, prefixIndex) + ".dat";
+        File dataFile = new File(dataFielName);
+        List<String> parameters = Files.readLines(dataFile, 
Charset.defaultCharset());
+        return parameters;
+    }
+
+    protected static void printInfo(String info) {
+        System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + 
info);
+    }
+
+    protected static void printResult(ITable resultTable) throws 
DataSetException {
+        StringBuilder sb = new StringBuilder();
+
+        int columnCount = resultTable.getTableMetaData().getColumns().length;
+        String[] columns = new String[columnCount];
+
+        for (int i = 0; i < columnCount; i++) {
+            
sb.append(resultTable.getTableMetaData().getColumns()[i].getColumnName());
+            sb.append("-");
+            
sb.append(resultTable.getTableMetaData().getColumns()[i].getDataType());
+            sb.append("\t");
+            columns[i] = 
resultTable.getTableMetaData().getColumns()[i].getColumnName();
+        }
+        sb.append("\n");
+
+        for (int i = 0; i < resultTable.getRowCount(); i++) {
+            for (int j = 0; j < columns.length; j++) {
+                sb.append(resultTable.getValue(i, columns[j]));
+                sb.append("\t");
+            }
+            sb.append("\n");
+        }
+        System.out.println(sb.toString());
+    }
+
+    protected Set<String> buildExclusiveSet(String[] exclusiveQuerys) {
+        Set<String> exclusiveSet = new HashSet<String>();
+        if (exclusiveQuerys != null) {
+            for (String query : exclusiveQuerys) {
+                exclusiveSet.add(query);
+            }
+        }
+        return exclusiveSet;
+    }
+
+    // 
////////////////////////////////////////////////////////////////////////////////////////
+    // execute
+
+    protected ITable executeQuery(IDatabaseConnection dbConn, String 
queryName, String sql, boolean needSort) throws Exception {
+
+        // change join type to match current setting
+        sql = changeJoinType(sql, joinType);
+
+        ITable queryTable = dbConn.createQueryTable(resultTableName + 
queryName, sql);
+        String[] columnNames = new 
String[queryTable.getTableMetaData().getColumns().length];
+        for (int i = 0; i < columnNames.length; i++) {
+            columnNames[i] = 
queryTable.getTableMetaData().getColumns()[i].getColumnName();
+        }
+        if (needSort) {
+            queryTable = new SortedTable(queryTable, columnNames);
+        }
+        //printResult(queryTable);
+
+        return queryTable;
+    }
+
+    protected int executeQuery(String sql, boolean needDisplay) throws 
SQLException {
+
+        // change join type to match current setting
+        sql = changeJoinType(sql, joinType);
+
+        Statement statement = null;
+        ResultSet resultSet = null;
+        try {
+            printInfo("start running...");
+            statement = cubeConnection.createStatement();
+            resultSet = statement.executeQuery(sql);
+            printInfo("stop running...");
+
+            return output(resultSet, needDisplay);
+        } finally {
+            if (resultSet != null) {
+                try {
+                    resultSet.close();
+                } catch (SQLException e) {
+                    // ignore
+                }
+            }
+            if (statement != null) {
+                try {
+                    statement.close();
+                } catch (SQLException e) {
+                    // ignore
+                }
+            }
+        }
+
+    }
+
+    protected ITable executeDynamicQuery(IDatabaseConnection dbConn, String 
queryName, String sql, List<String> parameters, boolean needSort) throws 
Exception {
+
+        // change join type to match current setting
+        sql = changeJoinType(sql, joinType);
+
+        PreparedStatement prepStat = 
dbConn.getConnection().prepareStatement(sql);
+        for (int j = 1; j <= parameters.size(); ++j) {
+            prepStat.setString(j, parameters.get(j - 1).trim());
+        }
+
+        ITable queryTable = dbConn.createTable(resultTableName + queryName, 
prepStat);
+        String[] columnNames = new 
String[queryTable.getTableMetaData().getColumns().length];
+        for (int i = 0; i < columnNames.length; i++) {
+            columnNames[i] = 
queryTable.getTableMetaData().getColumns()[i].getColumnName();
+        }
+        if (needSort) {
+            queryTable = new SortedTable(queryTable, columnNames);
+        }
+        //printResult(queryTable);
+        return queryTable;
+    }
+
+    // end of execute
+    // 
////////////////////////////////////////////////////////////////////////////////////////
+
+    protected static String changeJoinType(String sql, String targetType) {
+
+        if (targetType.equalsIgnoreCase("default"))
+            return sql;
+
+        String specialStr = "changeJoinType_DELIMITERS";
+        sql = sql.replaceAll(System.getProperty("line.separator"), " " + 
specialStr + " ");
+
+        String[] tokens = StringUtils.split(sql, null);// split white spaces
+        for (int i = 0; i < tokens.length - 1; ++i) {
+            if ((tokens[i].equalsIgnoreCase("inner") || 
tokens[i].equalsIgnoreCase("left")) && tokens[i + 1].equalsIgnoreCase("join")) {
+                tokens[i] = targetType.toLowerCase();
+            }
+        }
+
+        String ret = StringUtils.join(tokens, " ");
+        ret = ret.replaceAll(specialStr, System.getProperty("line.separator"));
+        System.out.println("The actual sql executed is: " + ret);
+
+        return ret;
+    }
+
+    protected static void batchChangeJoinType(String targetType) throws 
IOException {
+        List<File> files = new LinkedList<File>();
+        getFilesFromFolderR("src/test/resources/query", files, ".sql");
+        for (File file : files) {
+            String x = changeJoinType(getTextFromFile(file), targetType);
+            putTextTofile(file, x);
+        }
+    }
+
+    protected void execQueryUsingH2(String queryFolder, boolean needSort) 
throws Exception {
+        printInfo("---------- Running H2 queries: " + queryFolder);
+
+        List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), 
".sql");
+        for (File sqlFile : sqlFiles) {
+            String queryName = StringUtils.split(sqlFile.getName(), '.')[0];
+            String sql = getTextFromFile(sqlFile);
+
+            // execute H2
+            printInfo("Query Result from H2 - " + queryName);
+            H2Connection h2Conn = new H2Connection(h2Connection, null);
+            
h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new 
TestH2DataTypeFactory());
+            executeQuery(h2Conn, queryName, sql, needSort);
+        }
+    }
+
+    protected void verifyResultRowCount(String queryFolder) throws Exception {
+        printInfo("---------- verify result count in folder: " + queryFolder);
+
+        List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), 
".sql");
+        for (File sqlFile : sqlFiles) {
+            String queryName = StringUtils.split(sqlFile.getName(), '.')[0];
+            String sql = getTextFromFile(sqlFile);
+
+            File expectResultFile = new File(sqlFile.getParent(), 
sqlFile.getName() + ".expected");
+            int expectRowCount = 
Integer.parseInt(Files.readFirstLine(expectResultFile, 
Charset.defaultCharset()));
+
+            // execute Kylin
+            printInfo("Query Result from Kylin - " + queryName + "  (" + 
queryFolder + ")");
+            IDatabaseConnection kylinConn = new 
DatabaseConnection(cubeConnection);
+            ITable kylinTable = executeQuery(kylinConn, queryName, sql, false);
+
+            // compare the result
+            Assert.assertEquals(expectRowCount, kylinTable.getRowCount());
+            // Assertion.assertEquals(expectRowCount, 
kylinTable.getRowCount());
+        }
+    }
+
+    protected void execAndCompResultSize(String queryFolder, String[] 
exclusiveQuerys, boolean needSort) throws Exception {
+        printInfo("---------- test folder: " + queryFolder);
+        Set<String> exclusiveSet = buildExclusiveSet(exclusiveQuerys);
+
+        List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), 
".sql");
+        for (File sqlFile : sqlFiles) {
+            String queryName = StringUtils.split(sqlFile.getName(), '.')[0];
+            if (exclusiveSet.contains(queryName)) {
+                continue;
+            }
+            String sql = getTextFromFile(sqlFile);
+
+            // execute Kylin
+            printInfo("Query Result from Kylin - " + queryName + "  (" + 
queryFolder + ")");
+            IDatabaseConnection kylinConn = new 
DatabaseConnection(cubeConnection);
+            ITable kylinTable = executeQuery(kylinConn, queryName, sql, 
needSort);
+
+            // execute H2
+            printInfo("Query Result from H2 - " + queryName);
+            H2Connection h2Conn = new H2Connection(h2Connection, null);
+            
h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new 
TestH2DataTypeFactory());
+            ITable h2Table = executeQuery(h2Conn, queryName, sql, needSort);
+
+            // compare the result
+            Assert.assertEquals(h2Table.getRowCount(), 
kylinTable.getRowCount());
+
+            compQueryCount++;
+            if (kylinTable.getRowCount() == 0) {
+                zeroResultQueries.add(sql);
+            }
+        }
+    }
+
+    protected void execAndCompQuery(String queryFolder, String[] 
exclusiveQuerys, boolean needSort) throws Exception {
+        printInfo("---------- test folder: " + queryFolder);
+        Set<String> exclusiveSet = buildExclusiveSet(exclusiveQuerys);
+
+        List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), 
".sql");
+        for (File sqlFile : sqlFiles) {
+            String queryName = StringUtils.split(sqlFile.getName(), '.')[0];
+            if (exclusiveSet.contains(queryName)) {
+                continue;
+            }
+            String sql = getTextFromFile(sqlFile);
+
+            // execute Kylin
+            printInfo("Query Result from Kylin - " + queryName + "  (" + 
queryFolder + ")");
+            IDatabaseConnection kylinConn = new 
DatabaseConnection(cubeConnection);
+            ITable kylinTable = executeQuery(kylinConn, queryName, sql, 
needSort);
+
+            // execute H2
+            printInfo("Query Result from H2 - " + queryName);
+            H2Connection h2Conn = new H2Connection(h2Connection, null);
+            
h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new 
TestH2DataTypeFactory());
+            ITable h2Table = executeQuery(h2Conn, queryName, sql, needSort);
+
+            // compare the result
+            Assertion.assertEquals(h2Table, kylinTable);
+
+            compQueryCount++;
+            if (kylinTable.getRowCount() == 0) {
+                zeroResultQueries.add(sql);
+            }
+        }
+    }
+
+    protected void execAndCompDynamicQuery(String queryFolder, String[] 
exclusiveQuerys, boolean needSort) throws Exception {
+        printInfo("---------- test folder: " + queryFolder);
+        Set<String> exclusiveSet = buildExclusiveSet(exclusiveQuerys);
+
+        List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), 
".sql");
+        for (File sqlFile : sqlFiles) {
+            String queryName = StringUtils.split(sqlFile.getName(), '.')[0];
+            if (exclusiveSet.contains(queryName)) {
+                continue;
+            }
+            String sql = getTextFromFile(sqlFile);
+            List<String> parameters = getParameterFromFile(sqlFile);
+
+            // execute Kylin
+            printInfo("Query Result from Kylin - " + queryName + "  (" + 
queryFolder + ")");
+            IDatabaseConnection kylinConn = new 
DatabaseConnection(cubeConnection);
+            ITable kylinTable = executeDynamicQuery(kylinConn, queryName, sql, 
parameters, needSort);
+
+            // execute H2
+            printInfo("Query Result from H2 - " + queryName);
+            IDatabaseConnection h2Conn = new DatabaseConnection(h2Connection);
+            
h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new 
TestH2DataTypeFactory());
+            ITable h2Table = executeDynamicQuery(h2Conn, queryName, sql, 
parameters, needSort);
+
+            // compare the result
+            Assertion.assertEquals(h2Table, kylinTable);
+        }
+    }
+
+    protected int runSqlFile(String file) throws Exception {
+        return runSQL(new File(file), true, false);
+    }
+
+    protected int runSQL(File sqlFile, boolean debug, boolean explain) throws 
Exception {
+        if (debug) {
+            System.setProperty("calcite.debug", "true");
+            InputStream inputStream = new 
FileInputStream("src/test/resources/logging.properties");
+            LogManager.getLogManager().readConfiguration(inputStream);
+        }
+
+        String queryName = StringUtils.split(sqlFile.getName(), '.')[0];
+        printInfo("Testing Query " + queryName);
+        String sql = getTextFromFile(sqlFile);
+        if (explain) {
+            sql = "explain plan for " + sql;
+        }
+        int count = executeQuery(sql, true);
+
+        if (debug) {
+            System.clearProperty("calcite.debug");
+        }
+        return count;
+    }
+
+    protected void batchExecuteQuery(String queryFolder) throws Exception {
+        List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), 
".sql");
+        for (File sqlFile : sqlFiles) {
+            runSQL(sqlFile, false, false);
+        }
+    }
+
+    protected int output(ResultSet resultSet, boolean needDisplay) throws 
SQLException {
+        int count = 0;
+        ResultSetMetaData metaData = resultSet.getMetaData();
+        int columnCount = metaData.getColumnCount();
+        StringBuilder sb = new StringBuilder("\n");
+        if (needDisplay) {
+            for (int i = 1; i <= columnCount; i++) {
+                sb.append(metaData.getColumnName(i));
+                sb.append("-");
+                sb.append(metaData.getTableName(i));
+                sb.append("-");
+                sb.append(metaData.getColumnTypeName(i));
+                if (i < columnCount) {
+                    sb.append("\t");
+                } else {
+                    sb.append("\n");
+                }
+            }
+        }
+
+        while (resultSet.next()) {
+            if (needDisplay) {
+                for (int i = 1; i <= columnCount; i++) {
+                    sb.append(resultSet.getString(i));
+                    if (i < columnCount) {
+                        sb.append("\t");
+                    } else {
+                        sb.append("\n");
+                    }
+                }
+            }
+            count++;
+        }
+        printInfo(sb.toString());
+        return count;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1428bbc4/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
 
b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
new file mode 100644
index 0000000..c4f0777
--- /dev/null
+++ 
b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
+
+    @Before
+    public void setup() throws Exception {
+        super.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        super.cleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws IOException {
+        KylinConfig config = getTestConfig();
+        String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", 
"EDW.TEST_CAL_DT" };
+        Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, 
config);
+
+        assertTrue(loaded.size() == toLoad.length);
+        for (String str : toLoad)
+            assertTrue(loaded.contains(str));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1428bbc4/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
 
b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
new file mode 100644
index 0000000..757888e
--- /dev/null
+++ 
b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This test case need the hive runtime; Please run it with sandbox;
+ * @author shaoshi
+ *
+ * It is in the exclude list of default profile in pom.xml
+ */
+public class ITHiveTableReaderTest extends HBaseMetadataTestCase {
+
+    @Test
+    public void test() throws IOException {
+        HiveTableReader reader = new HiveTableReader("default", 
"test_kylin_fact");
+        int rowNumber = 0;
+        while (reader.next()) {
+            String[] row = reader.getRow();
+            Assert.assertEquals(9, row.length);
+            //System.out.println(ArrayUtils.toString(row));
+            rowNumber++;
+        }
+
+        reader.close();
+        Assert.assertEquals(10000, rowNumber);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1428bbc4/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
 
b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
new file mode 100644
index 0000000..4ef299d
--- /dev/null
+++ 
b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableReader;
+import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class ITSnapshotManagerTest extends HBaseMetadataTestCase {
+
+    SnapshotManager snapshotMgr;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        snapshotMgr = SnapshotManager.getInstance(getTestConfig());
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void basicTest() throws Exception {
+        String tableName = "EDW.TEST_SITES";
+        TableDesc tableDesc = 
MetadataManager.getInstance(getTestConfig()).getTableDesc(tableName);
+        ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
+        String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, 
tableDesc).getResourcePath();
+
+        snapshotMgr.wipeoutCache();
+
+        SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotPath);
+
+        // compare hive & snapshot
+        TableReader hiveReader = hiveTable.getReader();
+        TableReader snapshotReader = snapshot.getReader();
+
+        while (true) {
+            boolean hiveNext = hiveReader.next();
+            boolean snapshotNext = snapshotReader.next();
+            assertEquals(hiveNext, snapshotNext);
+
+            if (hiveNext == false)
+                break;
+
+            String[] hiveRow = hiveReader.getRow();
+            String[] snapshotRow = snapshotReader.getRow();
+            assertArrayEquals(hiveRow, snapshotRow);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1428bbc4/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java
 
b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java
new file mode 100644
index 0000000..c560bb8
--- /dev/null
+++ 
b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHBaseResourceStoreTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ITHBaseResourceStoreTest extends HBaseMetadataTestCase {
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testHBaseStore() throws Exception {
+        testAStore(ResourceStore.getStore(KylinConfig.getInstanceFromEnv()));
+    }
+
+    @Test
+    public void testHBaseStoreWithLargeCell() throws Exception {
+        String path = "/cube/_test_large_cell.json";
+        String largeContent = "THIS_IS_A_LARGE_CELL";
+        StringEntity content = new StringEntity(largeContent);
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        int origSize = config.getHBaseKeyValueSize();
+        ResourceStore store = 
ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
+
+        try {
+            config.setProperty("kylin.hbase.client.keyvalue.maxsize", 
String.valueOf(largeContent.length() - 1));
+
+            store.deleteResource(path);
+
+            store.putResource(path, content, StringEntity.serializer);
+            assertTrue(store.exists(path));
+            StringEntity t = store.getResource(path, StringEntity.class, 
StringEntity.serializer);
+            assertEquals(content, t);
+
+            Path redirectPath = ((HBaseResourceStore) 
store).bigCellHDFSPath(path);
+            Configuration hconf = 
HBaseConnection.getCurrentHBaseConfiguration();
+            FileSystem fileSystem = FileSystem.get(hconf);
+            assertTrue(fileSystem.exists(redirectPath));
+
+            FSDataInputStream in = fileSystem.open(redirectPath);
+            assertEquals(largeContent, in.readUTF());
+            in.close();
+
+            store.deleteResource(path);
+        } finally {
+            config.setProperty("kylin.hbase.client.keyvalue.maxsize", "" + 
origSize);
+            store.deleteResource(path);
+        }
+    }
+
+    void testAStore(ResourceStore store) throws IOException {
+        String dir1 = "/cube";
+        String path1 = "/cube/_test.json";
+        StringEntity content1 = new StringEntity("anything");
+        String dir2 = "/table";
+        String path2 = "/table/_test.json";
+        StringEntity content2 = new StringEntity("something");
+
+        // cleanup legacy if any
+        store.deleteResource(path1);
+        store.deleteResource(path2);
+
+        StringEntity t;
+
+        // put/get
+        store.putResource(path1, content1, StringEntity.serializer);
+        assertTrue(store.exists(path1));
+        t = store.getResource(path1, StringEntity.class, 
StringEntity.serializer);
+        assertEquals(content1, t);
+
+        store.putResource(path2, content2, StringEntity.serializer);
+        assertTrue(store.exists(path2));
+        t = store.getResource(path2, StringEntity.class, 
StringEntity.serializer);
+        assertEquals(content2, t);
+
+        // overwrite
+        t.str = "new string";
+        store.putResource(path2, t, StringEntity.serializer);
+
+        // write conflict
+        try {
+            t.setLastModified(t.getLastModified() - 1);
+            store.putResource(path2, t, StringEntity.serializer);
+            fail("write conflict should trigger IllegalStateException");
+        } catch (IllegalStateException e) {
+            // expected
+        }
+
+        // list
+        ArrayList<String> list;
+
+        list = store.listResources(dir1);
+        assertTrue(list.contains(path1));
+        assertTrue(list.contains(path2) == false);
+
+        list = store.listResources(dir2);
+        assertTrue(list.contains(path2));
+        assertTrue(list.contains(path1) == false);
+
+        list = store.listResources("/");
+        assertTrue(list.contains(dir1));
+        assertTrue(list.contains(dir2));
+        assertTrue(list.contains(path1) == false);
+        assertTrue(list.contains(path2) == false);
+
+        list = store.listResources(path1);
+        assertNull(list);
+        list = store.listResources(path2);
+        assertNull(list);
+
+        // delete/exist
+        store.deleteResource(path1);
+        assertTrue(store.exists(path1) == false);
+        list = store.listResources(dir1);
+        assertTrue(list == null || list.contains(path1) == false);
+
+        store.deleteResource(path2);
+        assertTrue(store.exists(path2) == false);
+        list = store.listResources(dir2);
+        assertTrue(list == null || list.contains(path2) == false);
+    }
+
+    public static class StringEntity extends RootPersistentEntity {
+
+        static final Serializer<StringEntity> serializer = new 
Serializer<StringEntity>() {
+            @Override
+            public void serialize(StringEntity obj, DataOutputStream out) 
throws IOException {
+                out.writeUTF(obj.str);
+            }
+
+            @Override
+            public StringEntity deserialize(DataInputStream in) throws 
IOException {
+                String str = in.readUTF();
+                return new StringEntity(str);
+            }
+        };
+
+        String str;
+
+        public StringEntity(String str) {
+            this.str = str;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = super.hashCode();
+            result = prime * result + ((str == null) ? 0 : str.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == this)
+                return true;
+            if (!(obj instanceof StringEntity))
+                return false;
+            return StringUtils.equals(this.str, ((StringEntity) obj).str);
+        }
+
+        @Override
+        public String toString() {
+            return str;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1428bbc4/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java 
b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
new file mode 100644
index 0000000..499a456
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ */
+public class ITHdfsOpsTest extends HBaseMetadataTestCase {
+
+    FileSystem fileSystem;
+
+    @Before
+    public void setup() throws Exception {
+
+        this.createTestMetadata();
+
+        Configuration hconf = HadoopUtil.getCurrentConfiguration();
+
+        fileSystem = FileSystem.get(hconf);
+    }
+
+    @Test
+    public void TestPath() throws IOException {
+        String hdfsWorkingDirectory = 
KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
+        Path coprocessorDir = new Path(hdfsWorkingDirectory, "test");
+        fileSystem.mkdirs(coprocessorDir);
+
+        Path newFile = new Path(coprocessorDir, "test_file");
+        newFile = newFile.makeQualified(fileSystem.getUri(), null);
+        FSDataOutputStream stream = fileSystem.create(newFile);
+        stream.write(new byte[] { 0, 1, 2 });
+        stream.close();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1428bbc4/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java 
b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
new file mode 100644
index 0000000..d6443e7
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.IStorageQuery;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.StorageFactory;
+import org.apache.kylin.storage.cache.StorageMockUtils;
+import org.apache.kylin.storage.exception.ScanOutOfLimitException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class ITStorageTest extends HBaseMetadataTestCase {
+
+    private IStorageQuery storageEngine;
+    private CubeInstance cube;
+    private StorageContext context;
+
+    @BeforeClass
+    public static void setupResource() throws Exception {
+    }
+
+    @AfterClass
+    public static void tearDownResource() {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+
+        CubeManager cubeMgr = CubeManager.getInstance(getTestConfig());
+        cube = cubeMgr.getCube("test_kylin_cube_without_slr_left_join_empty");
+        Assert.assertNotNull(cube);
+        storageEngine = StorageFactory.createQuery(cube);
+        String url = KylinConfig.getInstanceFromEnv().getStorageUrl();
+        context = new StorageContext();
+        context.setConnUrl(url);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test(expected = ScanOutOfLimitException.class)
+    @Ignore
+    public void testScanOutOfLimit() {
+        context.setThreshold(1);
+        List<TblColRef> groups = StorageMockUtils.buildGroups();
+        List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
+
+        search(groups, aggregations, null, context);
+    }
+
+    @Test
+    public void test01() {
+        List<TblColRef> groups = StorageMockUtils.buildGroups();
+        List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
+        TupleFilter filter = StorageMockUtils.buildFilter1(groups.get(0));
+
+        int count = search(groups, aggregations, filter, context);
+        assertTrue(count > 0);
+    }
+
+    /*
+        @Test
+        public void test02() {
+            List<TblColRef> groups = buildGroups();
+            List<FunctionDesc> aggregations = buildAggregations();
+            TupleFilter filter = buildFilter2(groups.get(1));
+
+            int count = search(groups, aggregations, filter, context);
+            assertTrue(count > 0);
+        }
+
+        @Test
+        public void test03() {
+            List<TblColRef> groups = buildGroups();
+            List<FunctionDesc> aggregations = buildAggregations();
+            TupleFilter filter = buildAndFilter(groups);
+
+            int count = search(groups, aggregations, filter, context);
+            assertTrue(count > 0);
+        }
+
+        @Test
+        public void test04() {
+            List<TblColRef> groups = buildGroups();
+            List<FunctionDesc> aggregations = buildAggregations();
+            TupleFilter filter = buildOrFilter(groups);
+
+            int count = search(groups, aggregations, filter, context);
+            assertTrue(count > 0);
+        }
+
+        @Test
+        public void test05() {
+            List<TblColRef> groups = buildGroups();
+            List<FunctionDesc> aggregations = buildAggregations();
+
+            int count = search(groups, aggregations, null, context);
+            assertTrue(count > 0);
+        }
+    */
+    private int search(List<TblColRef> groups, List<FunctionDesc> 
aggregations, TupleFilter filter, StorageContext context) {
+        int count = 0;
+        ITupleIterator iterator = null;
+        try {
+            SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", 
filter, null, Collections.<TblColRef> emptySet(), groups, 
Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), 
aggregations, new ArrayList<MeasureDesc>(), new 
ArrayList<SQLDigest.OrderEnum>());
+            iterator = storageEngine.search(context, sqlDigest, 
StorageMockUtils.newTupleInfo(groups, aggregations));
+            while (iterator.hasNext()) {
+                ITuple tuple = iterator.next();
+                System.out.println("Tuple = " + tuple);
+                count++;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (iterator != null)
+                iterator.close();
+        }
+        return count;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1428bbc4/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java
 
b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java
new file mode 100644
index 0000000..b5be703
--- /dev/null
+++ 
b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.ii;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.storage.hbase.cube.v1.HBaseClientKVIterator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * @author yangli9
+ */
+public class ITInvertedIndexHBaseTest extends HBaseMetadataTestCase {
+
+    IIInstance ii;
+    IISegment seg;
+    HConnection hconn;
+
+    TableRecordInfo info;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+
+        this.ii = 
IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        this.seg = ii.getFirstSegment();
+
+        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+        hconn = HConnectionManager.createConnection(hconf);
+
+        this.info = new TableRecordInfo(seg);
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testLoad() throws Exception {
+
+        String tableName = seg.getStorageLocationIdentifier();
+        IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
+
+        List<Slice> slices = Lists.newArrayList();
+        HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, 
tableName, IIDesc.HBASE_FAMILY_BYTES);
+        try {
+            for (Slice slice : codec.decodeKeyValue(kvIterator)) {
+                slices.add(slice);
+            }
+        } finally {
+            kvIterator.close();
+        }
+
+        List<TableRecord> records = iterateRecords(slices);
+        //dump(records);
+        System.out.println("table name:" + tableName + " has " + 
records.size() + " records");
+    }
+
+    private List<TableRecord> iterateRecords(List<Slice> slices) {
+        List<TableRecord> records = Lists.newArrayList();
+        for (Slice slice : slices) {
+            for (RawTableRecord rec : slice) {
+                records.add(new TableRecord((RawTableRecord) rec.clone(), 
info));
+            }
+        }
+        return records;
+    }
+
+    @SuppressWarnings("unused")
+    private void dump(Iterable<TableRecord> records) {
+        for (TableRecord rec : records) {
+            byte[] x = rec.getBytes();
+            String y = BytesUtil.toReadableText(x);
+            System.out.println(y);
+            System.out.println();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/1428bbc4/kylin-it/src/test/resources/logging.properties
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/logging.properties 
b/kylin-it/src/test/resources/logging.properties
new file mode 100644
index 0000000..a925478
--- /dev/null
+++ b/kylin-it/src/test/resources/logging.properties
@@ -0,0 +1,5 @@
+handlers=java.util.logging.ConsoleHandler
+.level=INFO
+#org.eigenbase.relopt.RelOptPlanner.level=FINEST
+java.util.logging.ConsoleHandler.level=ALL
+java.util.logging.ConsoleHandler.formatter=org.apache.kylin.common.util.MyLogFormatter
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/1428bbc4/kylin-it/src/test/resources/query/debug/query78.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/debug/query78.sql 
b/kylin-it/src/test/resources/query/debug/query78.sql
new file mode 100644
index 0000000..299f1a4
--- /dev/null
+++ b/kylin-it/src/test/resources/query/debug/query78.sql
@@ -0,0 +1,22 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select count(*) as c,sum(PRICE) as GMV, LSTG_FORMAT_NAME as FORMAT_NAME
+from test_kylin_fact
+where (LSTG_FORMAT_NAME in ('ABIN')) or  (LSTG_FORMAT_NAME>='FP-GTC' and 
LSTG_FORMAT_NAME<='Others')
+group by LSTG_FORMAT_NAME

Reply via email to