http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java b/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java deleted file mode 100644 index 57c0be3..0000000 --- a/assembly/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.source.hive; - -import java.io.IOException; - -import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; -import org.junit.Assert; -import org.junit.Test; - -/** - * This test case need the hive runtime; Please run it with sandbox; - * @author shaoshi - * - * It is in the exclude list of default profile in pom.xml - */ -public class ITHiveTableReaderTest extends HBaseMetadataTestCase { - - @Test - public void test() throws IOException { - HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact"); - int rowNumber = 0; - while (reader.next()) { - String[] row = reader.getRow(); - Assert.assertEquals(9, row.length); - //System.out.println(ArrayUtils.toString(row)); - rowNumber++; - } - - reader.close(); - Assert.assertEquals(10000, rowNumber); - } -}
http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/assembly/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/assembly/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java deleted file mode 100644 index 0df632a..0000000 --- a/assembly/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.source.hive; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -import org.apache.kylin.dict.lookup.SnapshotManager; -import org.apache.kylin.dict.lookup.SnapshotTable; -import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.ReadableTable; -import org.apache.kylin.source.ReadableTable.TableReader; -import org.apache.kylin.source.SourceFactory; -import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -/** - * @author yangli9 - * - */ -public class ITSnapshotManagerTest extends HBaseMetadataTestCase { - - SnapshotManager snapshotMgr; - - @Before - public void setup() throws Exception { - createTestMetadata(); - snapshotMgr = SnapshotManager.getInstance(getTestConfig()); - } - - @After - public void after() throws Exception { - cleanupTestMetadata(); - } - - @Test - public void basicTest() throws Exception { - String tableName = "EDW.TEST_SITES"; - TableDesc tableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc(tableName); - ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc); - String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc).getResourcePath(); - - snapshotMgr.wipeoutCache(); - - SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotPath); - - // compare hive & snapshot - TableReader hiveReader = hiveTable.getReader(); - TableReader snapshotReader = snapshot.getReader(); - - while (true) { - boolean hiveNext = hiveReader.next(); - boolean snapshotNext = snapshotReader.next(); - assertEquals(hiveNext, snapshotNext); - - if (hiveNext == false) - break; - - String[] hiveRow = hiveReader.getRow(); - String[] snapshotRow = snapshotReader.getRow(); - assertArrayEquals(hiveRow, snapshotRow); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java index ae96e67..b481557 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java @@ -79,6 +79,8 @@ public class BasicTest { @Test public void testxx() throws InterruptedException { + System.out.println(System.getProperty("skipTests").length()); + System.out.println(System.getProperty("skipXests")); byte[][] data = new byte[10000000][]; byte[] temp = new byte[100]; for (int i = 0; i < 100; i++) { http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java new file mode 100644 index 0000000..9d78957 --- /dev/null +++ b/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.common.util; + +import java.io.File; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractKylinTestCase; +import org.apache.kylin.common.util.ClassUtil; + +/** + * @author ysong1 + */ +public class HBaseMetadataTestCase extends AbstractKylinTestCase { + + static { + try { + ClassUtil.addClasspath(new File("../examples/test_case_data/sandbox/").getAbsolutePath()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void createTestMetadata() throws Exception { + staticCreateTestMetadata(); + } + + @Override + public void cleanupTestMetadata() { + staticCleanupTestMetadata(); + } + + public static void staticCreateTestMetadata() throws Exception { + staticCreateTestMetadata(SANDBOX_TEST_DATA); + } + + public static void staticCreateTestMetadata(String kylinConfigFolder) { + + KylinConfig.destoryInstance(); + + if (System.getProperty(KylinConfig.KYLIN_CONF) == null && System.getenv(KylinConfig.KYLIN_CONF) == null) + System.setProperty(KylinConfig.KYLIN_CONF, kylinConfigFolder); + + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/kylin-it/pom.xml ---------------------------------------------------------------------- diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml new file mode 100644 index 0000000..cc3e9b2 --- /dev/null +++ b/kylin-it/pom.xml @@ -0,0 +1,367 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>kylin</artifactId> + <groupId>org.apache.kylin</groupId> + <version>2.0-incubating-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>kylin-it</artifactId> + <name>Kylin:IT</name> + + + <!-- Dependencies. --> + <dependencies> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>atopcalcite</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-core-storage</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-source-hive</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-jdbc</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-storage-hbase</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-query</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-invertedindex</artifactId> + <version>${project.parent.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-linq4j</artifactId> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + + <!-- Env & Test --> + + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-core-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-core-storage</artifactId> + <type>test-jar</type> + <scope>test</scope> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-storage-hbase</artifactId> + <type>test-jar</type> + <scope>test</scope> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-storage-hbase</artifactId> + <type>test-jar</type> + <scope>test</scope> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-assembly</artifactId> + <type>test-jar</type> + <scope>test</scope> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>xerces</groupId> + <artifactId>xercesImpl</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>xalan</groupId> + <artifactId>xalan</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.dbunit</groupId> + <artifactId>dbunit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>provided</scope> + <!-- protobuf version conflict with hbase --> + <exclusions> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <scope>provided</scope> + <!-- version conflict with hadoop2.2 --> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-jdbc</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-testing-util</artifactId> + <version>${hbase-hadoop2.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet.jsp</groupId> + <artifactId>jsp-api</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + + <profiles> + <profile> + <id>sandbox</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <version>2.16</version> + <executions> + <execution> + <id>integration-tests</id> + <phase>integration-test</phase> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + <configuration> + <excludes> + <exclude>**/*$*</exclude> + </excludes> + <systemProperties> + <property> + <name>useSandbox</name> + <value>true</value> + </property> + </systemProperties> + </configuration> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.4.0</version> + <executions> + <execution> + <id>build_cube_with_engine</id> + <goals> + <goal>java</goal> + </goals> + <phase>pre-integration-test</phase> + <configuration> + <skip>${skipTests}</skip> + <classpathScope>test</classpathScope> + <mainClass>org.apache.kylin.provision.BuildCubeWithEngine</mainClass> + </configuration> + </execution> + <execution> + <id>build_cube_with_stream</id> + <goals> + <goal>java</goal> + </goals> + <phase>pre-integration-test</phase> + <configuration> + <skip>${skipTests}</skip> + <classpathScope>test</classpathScope> + <mainClass>org.apache.kylin.provision.BuildCubeWithStream</mainClass> + </configuration> + </execution> + <execution> + <id>build_ii_with_stream</id> + <goals> + <goal>java</goal> + </goals> + <phase>pre-integration-test</phase> + <configuration> + <skip>${skipTests}</skip> + <classpathScope>test</classpathScope> + <mainClass>org.apache.kylin.provision.BuildIIWithStream</mainClass> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + </build> + </profile> + <profile> + <!-- This profile adds/overrides few features of the 'apache-release' + profile in the parent pom. --> + <id>apache-release</id> + <build> + <plugins> + <!-- Apache-RAT checks for files without headers. + If run on a messy developer's sandbox, it will fail. + This serves as a reminder to only build a release in a clean + sandbox! --> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <numUnapprovedLicenses>0</numUnapprovedLicenses> + <excludes> + <!-- test data --> + <exclude>src/test/**/*.dat</exclude> + <exclude>src/test/**/*.expected</exclude> + </excludes> + </configuration> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + <dependencies> + <dependency> + <groupId>org.apache.maven.doxia</groupId> + <artifactId>doxia-core</artifactId> + <version>1.6</version> + <exclusions> + <exclusion> + <groupId>xerces</groupId> + <artifactId>xercesImpl</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + </plugin> + <plugin> + <groupId>net.ju-n.maven.plugins</groupId> + <artifactId>checksum-maven-plugin</artifactId> + <version>1.2</version> + <executions> + <execution> + <goals> + <goal>artifacts</goal> + </goals> + </execution> + </executions> + <configuration> + <algorithms> + <algorithm>MD5</algorithm> + <algorithm>SHA-1</algorithm> + </algorithms> + <failOnError>false</failOnError> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java b/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java new file mode 100644 index 0000000..9b67248 --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriverTest.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.jdbc; + +import java.io.File; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.io.FileUtils; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.webapp.WebAppContext; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; + +/** + */ +public class ITJDBCDriverTest extends HBaseMetadataTestCase { + + private static Server server = null; + private static SystemPropertiesOverride sysPropsOverride = new SystemPropertiesOverride(); + + @BeforeClass + public static void beforeClass() throws Exception { + sysPropsOverride.override("spring.profiles.active", "testing"); + sysPropsOverride.override("catalina.home", "."); // resources/log4j.properties ref ${catalina.home} + staticCreateTestMetadata(); + startJetty(); + } + + @AfterClass + public static void afterClass() throws Exception { + stopJetty(); + staticCleanupTestMetadata(); + sysPropsOverride.restore(); + } + + protected static void stopJetty() throws Exception { + if (server != null) + server.stop(); + + File workFolder = new File("work"); + if (workFolder.isDirectory() && workFolder.exists()) { + FileUtils.deleteDirectory(workFolder); + } + } + + protected static void startJetty() throws Exception { + + server = new Server(7070); + + WebAppContext context = new WebAppContext(); + context.setDescriptor("./src/main/webapp/WEB-INF/web.xml"); + context.setResourceBase("./src/main/webapp"); + context.setContextPath("/kylin"); + context.setParentLoaderPriority(true); + + server.setHandler(context); + + server.start(); + + } + + protected Connection getConnection() throws Exception { + + Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance(); + Properties info = new Properties(); + info.put("user", "ADMIN"); + info.put("password", "KYLIN"); + Connection conn = driver.connect("jdbc:kylin://localhost:7070/default", info); + + return conn; + } + + @Test + public void testMetadata1() throws Exception { + + // check the JDBC API here: http://docs.oracle.com/javase/7/docs/api/java/sql/DatabaseMetaData.html + Connection conn = getConnection(); + + // test getSchemas(); + List<String> schemaList = Lists.newArrayList(); + DatabaseMetaData dbMetadata = conn.getMetaData(); + ResultSet resultSet = dbMetadata.getSchemas(); + while (resultSet.next()) { + String schema = resultSet.getString("TABLE_SCHEM"); + String catalog = resultSet.getString("TABLE_CATALOG"); + + System.out.println("Get schema: schema=" + schema + ", catalog=" + catalog); + schemaList.add(schema); + + } + + resultSet.close(); + Assert.assertTrue(schemaList.contains("DEFAULT")); + Assert.assertTrue(schemaList.contains("EDW")); + + // test getCatalogs(); + resultSet = dbMetadata.getCatalogs(); + + List<String> catalogList = Lists.newArrayList(); + while (resultSet.next()) { + String catalog = resultSet.getString("TABLE_CAT"); + + System.out.println("Get catalog: catalog=" + catalog); + catalogList.add(catalog); + + } + Assert.assertTrue(catalogList.size() > 0 && catalogList.contains("defaultCatalog")); + + /** //Disable the test on getTableTypes() as it is not ready + resultSet = dbMetadata.getTableTypes(); + + List<String> tableTypes = Lists.newArrayList(); + while (resultSet.next()) { + String type = resultSet.getString("TABLE_TYPE"); + + System.out.println("Get table type: type=" + type); + tableTypes.add(type); + + } + + Assert.assertTrue(tableTypes.size() > 0 && tableTypes.contains("TABLE")); + resultSet.close(); + + **/ + conn.close(); + } + + @Test + public void testMetadata2() throws Exception { + Connection conn = getConnection(); + + List<String> tableList = Lists.newArrayList(); + DatabaseMetaData dbMetadata = conn.getMetaData(); + ResultSet resultSet = dbMetadata.getTables(null, "%", "%", new String[] { "TABLE" }); + while (resultSet.next()) { + String schema = resultSet.getString("TABLE_SCHEM"); + String name = resultSet.getString("TABLE_NAME"); + + System.out.println("Get table: schema=" + schema + ", name=" + name); + tableList.add(schema + "." + name); + + } + + resultSet.close(); + Assert.assertTrue(tableList.contains("DEFAULT.TEST_KYLIN_FACT")); + + resultSet = dbMetadata.getColumns(null, "%", "TEST_KYLIN_FACT", "%"); + + List<String> columns = Lists.newArrayList(); + while (resultSet.next()) { + String name = resultSet.getString("COLUMN_NAME"); + String type = resultSet.getString("TYPE_NAME"); + + System.out.println("Get column: name=" + name + ", data_type=" + type); + columns.add(name); + + } + + Assert.assertTrue(columns.size() > 0 && columns.contains("CAL_DT")); + resultSet.close(); + conn.close(); + } + + @Test + public void testSimpleStatement() throws Exception { + Connection conn = getConnection(); + Statement statement = conn.createStatement(); + + statement.execute("select count(*) from test_kylin_fact"); + + ResultSet rs = statement.getResultSet(); + + Assert.assertTrue(rs.next()); + int result = rs.getInt(1); + + Assert.assertTrue(result > 0); + + rs.close(); + statement.close(); + conn.close(); + + } + + @Test + public void testPreparedStatement() throws Exception { + Connection conn = getConnection(); + + PreparedStatement statement = conn.prepareStatement("select LSTG_FORMAT_NAME, sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact " + "where LSTG_FORMAT_NAME = ? group by LSTG_FORMAT_NAME"); + + statement.setString(1, "FP-GTC"); + + ResultSet rs = statement.executeQuery(); + + Assert.assertTrue(rs.next()); + + String format_name = rs.getString(1); + + Assert.assertTrue("FP-GTC".equals(format_name)); + + rs.close(); + statement.close(); + conn.close(); + + } + + @Test + public void testResultSet() throws Exception { + String sql = "select LSTG_FORMAT_NAME, sum(price) as GMV, count(1) as TRANS_CNT from test_kylin_fact \n" + " group by LSTG_FORMAT_NAME "; + + Connection conn = getConnection(); + Statement statement = conn.createStatement(); + + statement.execute(sql); + + ResultSet rs = statement.getResultSet(); + + int count = 0; + while (rs.next()) { + count++; + String lstg = rs.getString(1); + double gmv = rs.getDouble(2); + int trans_count = rs.getInt(3); + + System.out.println("Get a line: LSTG_FORMAT_NAME=" + lstg + ", GMV=" + gmv + ", TRANS_CNT=" + trans_count); + } + + Assert.assertTrue(count > 0); + statement.close(); + rs.close(); + conn.close(); + + } + + private static class SystemPropertiesOverride { + HashMap<String, String> backup = new HashMap<String, String>(); + + public void override(String key, String value) { + backup.put(key, System.getProperty(key)); + System.setProperty(key, value); + } + + public void restore() { + for (String key : backup.keySet()) { + String value = backup.get(key); + if (value == null) + System.clearProperty(key); + else + System.setProperty(key, value); + } + backup.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java new file mode 100644 index 0000000..c40b470 --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.provision; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.lang.reflect.Method; +import java.text.SimpleDateFormat; +import java.util.List; +import java.util.TimeZone; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractKylinTestCase; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.job.DeployUtil; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.impl.threadpool.DefaultScheduler; +import org.apache.kylin.job.manager.ExecutableManager; +import org.apache.kylin.storage.hbase.util.StorageCleanupJob; +import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; + +import com.google.common.collect.Lists; + +public class BuildCubeWithEngine { + + private CubeManager cubeManager; + private DefaultScheduler scheduler; + protected ExecutableManager jobService; + private static boolean fastBuildMode = false; + + private static final Log logger = LogFactory.getLog(BuildCubeWithEngine.class); + + public static void main(String[] args) throws Exception { + beforeClass(); + BuildCubeWithEngine buildCubeWithEngine = new BuildCubeWithEngine(); + buildCubeWithEngine.before(); + buildCubeWithEngine.build(); + afterClass(); + } + + public static void beforeClass() throws Exception { + logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + + String fastModeStr = System.getProperty("fastBuildMode"); + if (fastModeStr != null && fastModeStr.equalsIgnoreCase("true")) { + fastBuildMode = true; + logger.info("Will use fast build mode"); + } else { + logger.info("Will not use fast build mode"); + } + + System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); + if (System.getProperty("hdp.version") == null) { + throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); + } + + HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + } + + public void before() throws Exception { + + DeployUtil.initCliWorkDir(); + DeployUtil.deployMetadata(); + DeployUtil.overrideJobJarLocations(); + + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + jobService = ExecutableManager.getInstance(kylinConfig); + scheduler = DefaultScheduler.getInstance(); + scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); + if (!scheduler.hasStarted()) { + throw new RuntimeException("scheduler has not been started"); + } + cubeManager = CubeManager.getInstance(kylinConfig); + for (String jobId : jobService.getAllJobIds()) { + if (jobService.getJob(jobId) instanceof CubingJob) { + jobService.deleteJob(jobId); + } + } + + } + + public static void afterClass() { + HBaseMetadataTestCase.staticCleanupTestMetadata(); + } + + public void build() throws Exception { + DeployUtil.prepareTestDataForNormalCubes("test_kylin_cube_with_slr_left_join_empty"); + testInner(); + testLeft(); + } + + protected void waitForJob(String jobId) { + while (true) { + AbstractExecutable job = jobService.getJob(jobId); + if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) { + break; + } else { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + private void testInner() throws Exception { + String[] testCase = new String[] { "testInnerJoinCubeWithoutSlr", "testInnerJoinCubeWithSlr" }; + runTestAndAssertSucceed(testCase); + } + + private void testLeft() throws Exception { + String[] testCase = new String[] { "testLeftJoinCubeWithSlr", "testLeftJoinCubeWithoutSlr" }; + runTestAndAssertSucceed(testCase); + } + + private void runTestAndAssertSucceed(String[] testCase) throws Exception { + ExecutorService executorService = Executors.newFixedThreadPool(testCase.length); + final CountDownLatch countDownLatch = new CountDownLatch(testCase.length); + List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length); + for (int i = 0; i < testCase.length; i++) { + tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch))); + } + countDownLatch.await(); + try { + for (int i = 0; i < tasks.size(); ++i) { + Future<List<String>> task = tasks.get(i); + final List<String> jobIds = task.get(); + for (String jobId : jobIds) { + assertJobSucceed(jobId); + } + } + } catch (Exception ex) { + logger.error(ex); + throw ex; + } + } + + private void assertJobSucceed(String jobId) { + assertEquals("The job '" + jobId + "' is failed.", ExecutableState.SUCCEED, jobService.getOutput(jobId).getState()); + } + + private class TestCallable implements Callable<List<String>> { + + private final String methodName; + private final CountDownLatch countDownLatch; + + public TestCallable(String methodName, CountDownLatch countDownLatch) { + this.methodName = methodName; + this.countDownLatch = countDownLatch; + } + + @SuppressWarnings("unchecked") + @Override + public List<String> call() throws Exception { + try { + final Method method = BuildCubeWithEngine.class.getDeclaredMethod(methodName); + method.setAccessible(true); + return (List<String>) method.invoke(BuildCubeWithEngine.this); + } catch (Exception e) { + logger.error(e.getMessage()); + throw e; + } finally { + countDownLatch.countDown(); + } + } + } + + @SuppressWarnings("unused") + // called by reflection + private List<String> testInnerJoinCubeWithSlr() throws Exception { + clearSegment("test_kylin_cube_with_slr_empty"); + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); + f.setTimeZone(TimeZone.getTimeZone("GMT")); + long date1 = 0; + long date2 = f.parse("2015-01-01").getTime(); + long date3 = f.parse("2022-01-01").getTime(); + List<String> result = Lists.newArrayList(); + + if (fastBuildMode) { + result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date3)); + } else { + result.add(buildSegment("test_kylin_cube_with_slr_empty", date1, date2)); + result.add(buildSegment("test_kylin_cube_with_slr_empty", date2, date3));//empty segment + } + return result; + } + + @SuppressWarnings("unused") + // called by reflection + private List<String> testInnerJoinCubeWithoutSlr() throws Exception { + + clearSegment("test_kylin_cube_without_slr_empty"); + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); + f.setTimeZone(TimeZone.getTimeZone("GMT")); + long date1 = 0; + long date2 = f.parse("2013-01-01").getTime(); + long date3 = f.parse("2013-07-01").getTime(); + long date4 = f.parse("2022-01-01").getTime(); + List<String> result = Lists.newArrayList(); + + if (fastBuildMode) { + result.add(buildSegment("test_kylin_cube_without_slr_empty", date1, date4)); + } else { + result.add(buildSegment("test_kylin_cube_without_slr_empty", date1, date2)); + result.add(buildSegment("test_kylin_cube_without_slr_empty", date2, date3)); + result.add(buildSegment("test_kylin_cube_without_slr_empty", date3, date4)); + result.add(mergeSegment("test_kylin_cube_without_slr_empty", date1, date3));//don't merge all segments + } + return result; + + } + + @SuppressWarnings("unused") + // called by reflection + private List<String> testLeftJoinCubeWithoutSlr() throws Exception { + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); + f.setTimeZone(TimeZone.getTimeZone("GMT")); + List<String> result = Lists.newArrayList(); + final String cubeName = "test_kylin_cube_without_slr_left_join_empty"; + clearSegment(cubeName); + + long date1 = cubeManager.getCube(cubeName).getDescriptor().getPartitionDateStart(); + long date2 = f.parse("2012-06-01").getTime(); + long date3 = f.parse("2022-01-01").getTime(); + long date4 = f.parse("2023-01-01").getTime(); + + if (fastBuildMode) { + result.add(buildSegment("test_kylin_cube_without_slr_left_join_empty", date1, date4)); + } else { + result.add(buildSegment("test_kylin_cube_without_slr_left_join_empty", date1, date2)); + result.add(buildSegment("test_kylin_cube_without_slr_left_join_empty", date2, date3)); + result.add(buildSegment("test_kylin_cube_without_slr_left_join_empty", date3, date4));//empty segment + result.add(mergeSegment("test_kylin_cube_without_slr_left_join_empty", date1, date3));//don't merge all segments + } + + return result; + + } + + @SuppressWarnings("unused") + // called by reflection + private List<String> testLeftJoinCubeWithSlr() throws Exception { + String cubeName = "test_kylin_cube_with_slr_left_join_empty"; + clearSegment(cubeName); + + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); + f.setTimeZone(TimeZone.getTimeZone("GMT")); + long date1 = cubeManager.getCube(cubeName).getDescriptor().getPartitionDateStart(); + long date2 = f.parse("2013-01-01").getTime(); + long date3 = f.parse("2013-07-01").getTime(); + long date4 = f.parse("2022-01-01").getTime(); + + List<String> result = Lists.newArrayList(); + if (fastBuildMode) { + result.add(buildSegment(cubeName, date1, date4)); + } else { + result.add(buildSegment(cubeName, date1, date2)); + result.add(buildSegment(cubeName, date2, date3)); + result.add(buildSegment(cubeName, date3, date4)); + result.add(mergeSegment(cubeName, date1, date3));//don't merge all segments + } + return result; + + } + + private void clearSegment(String cubeName) throws Exception { + CubeInstance cube = cubeManager.getCube(cubeName); + // remove all existing segments + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); + cubeManager.updateCube(cubeBuilder); + } + + private String mergeSegment(String cubeName, long startDate, long endDate) throws Exception { + CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), startDate, endDate, true); + DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST"); + jobService.addJob(job); + waitForJob(job.getId()); + return job.getId(); + } + + private String buildSegment(String cubeName, long startDate, long endDate) throws Exception { + CubeSegment segment = cubeManager.appendSegments(cubeManager.getCube(cubeName), endDate); + DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); + jobService.addJob(job); + waitForJob(job.getId()); + return job.getId(); + } + + private int cleanupOldStorage() throws Exception { + String[] args = { "--delete", "true" }; + + int exitCode = ToolRunner.run(new StorageCleanupJob(), args); + return exitCode; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java new file mode 100644 index 0000000..2a142fc --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.provision; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.text.SimpleDateFormat; +import java.util.List; +import java.util.TimeZone; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractKylinTestCase; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.spark.SparkBatchCubingEngine; +import org.apache.kylin.job.DeployUtil; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.impl.threadpool.DefaultScheduler; +import org.apache.kylin.job.lock.MockJobLock; +import org.apache.kylin.job.manager.ExecutableManager; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import com.google.common.collect.Lists; + +//TODO: convert it to a normal class rather than a test case, like in BuildCubeWithEngine +@Ignore +public class BuildCubeWithSpark { + + private CubeManager cubeManager; + private DefaultScheduler scheduler; + protected ExecutableManager jobService; + + private static final Log logger = LogFactory.getLog(BuildCubeWithSpark.class); + + protected void waitForJob(String jobId) { + while (true) { + AbstractExecutable job = jobService.getJob(jobId); + if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) { + break; + } else { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + @BeforeClass + public static void beforeClass() throws Exception { + logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); + if (System.getProperty("hdp.version") == null) { + throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); + } + + } + + @Before + public void before() throws Exception { + HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + + DeployUtil.initCliWorkDir(); + DeployUtil.deployMetadata(); + DeployUtil.overrideJobJarLocations(); + + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + jobService = ExecutableManager.getInstance(kylinConfig); + for (String jobId : jobService.getAllJobIds()) { + jobService.deleteJob(jobId); + } + scheduler = DefaultScheduler.getInstance(); + scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock()); + if (!scheduler.hasStarted()) { + throw new RuntimeException("scheduler has not been started"); + } + cubeManager = CubeManager.getInstance(kylinConfig); + + } + + @After + public void after() { + HBaseMetadataTestCase.staticCleanupTestMetadata(); + } + + @Test + public void test() throws Exception { + final CubeSegment segment = createSegment(); + String confPath = new File(AbstractKylinTestCase.SANDBOX_TEST_DATA).getAbsolutePath(); + KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar(); + String coprocessor = KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar(); + logger.info("confPath location:" + confPath); + logger.info("coprocessor location:" + coprocessor); + final DefaultChainedExecutable cubingJob = new SparkBatchCubingEngine(confPath, coprocessor).createBatchCubingJob(segment, "BuildCubeWithSpark"); + jobService.addJob(cubingJob); + waitForJob(cubingJob.getId()); + assertEquals(ExecutableState.SUCCEED, jobService.getOutput(cubingJob.getId()).getState()); + } + + private void clearSegment(String cubeName) throws Exception { + CubeInstance cube = cubeManager.getCube(cubeName); + // remove all existing segments + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); + cubeManager.updateCube(cubeBuilder); + } + + private CubeSegment createSegment() throws Exception { + String cubeName = "test_kylin_cube_with_slr_left_join_empty"; + clearSegment(cubeName); + + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); + f.setTimeZone(TimeZone.getTimeZone("GMT")); + long dateStart = cubeManager.getCube(cubeName).getDescriptor().getPartitionDateStart(); + long dateEnd = f.parse("2050-11-12").getTime(); + + // this cube's start date is 0, end date is 20501112000000 + List<String> result = Lists.newArrayList(); + return cubeManager.appendSegments(cubeManager.getCube(cubeName), dateEnd); + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java new file mode 100644 index 0000000..290b869 --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.provision; + +import java.io.File; +import java.util.UUID; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractKylinTestCase; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.engine.streaming.OneOffStreamingBuilder; +import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.job.DeployUtil; +import org.apache.kylin.source.kafka.KafkaConfigManager; +import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * for streaming cubing case "test_streaming_table" + */ +public class BuildCubeWithStream { + + private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream.class); + private static final String streamingName = "test_streaming_table_cube"; + private static final long startTime = DateFormat.stringToMillis("2015-01-01 00:00:00"); + private static final long endTime = DateFormat.stringToMillis("2015-01-03 00:00:00"); + private static final long batchInterval = 16 * 60 * 60 * 1000;//16 hours + + private KylinConfig kylinConfig; + + public static void main(String[] args) throws Exception { + beforeClass(); + BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream(); + buildCubeWithStream.before(); + buildCubeWithStream.build(); + afterClass(); + } + + public static void beforeClass() throws Exception { + logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); + if (System.getProperty("hdp.version") == null) { + throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); + } + HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + } + + public void before() throws Exception { + DeployUtil.overrideJobJarLocations(); + + kylinConfig = KylinConfig.getInstanceFromEnv(); + + final StreamingConfig config = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingName); + + //Use a random topic for kafka data stream + KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingName); + streamingConfig.setTopic(UUID.randomUUID().toString()); + KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig); + + DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, config.getCubeName(), streamingConfig); + } + + public static void afterClass() throws Exception { + HBaseMetadataTestCase.staticCleanupTestMetadata(); + } + + public void build() throws Exception { + logger.info("start time:" + startTime + " end time:" + endTime + " batch interval:" + batchInterval + " batch count:" + ((endTime - startTime) / batchInterval)); + for (long start = startTime; start < endTime; start += batchInterval) { + logger.info(String.format("build batch:{%d, %d}", start, start + batchInterval)); + new OneOffStreamingBuilder(streamingName, start, start + batchInterval).build().run(); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java new file mode 100644 index 0000000..b9c242d --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithEngine.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.provision; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.lang.reflect.Method; +import java.text.SimpleDateFormat; +import java.util.List; +import java.util.TimeZone; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractKylinTestCase; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.engine.mr.invertedindex.BatchIIJobBuilder; +import org.apache.kylin.engine.mr.invertedindex.IIJob; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.IISegment; +import org.apache.kylin.job.DeployUtil; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.impl.threadpool.DefaultScheduler; +import org.apache.kylin.job.manager.ExecutableManager; +import org.apache.kylin.metadata.realization.RealizationStatusEnum; +import org.apache.kylin.storage.hbase.util.StorageCleanupJob; +import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import com.google.common.collect.Lists; + +//TODO: convert it to a normal class rather than a test case, like in BuildCubeWithEngine +@Ignore +public class BuildIIWithEngine { + + private JobEngineConfig jobEngineConfig; + private IIManager iiManager; + + private DefaultScheduler scheduler; + protected ExecutableManager jobService; + + protected static final String[] TEST_II_INSTANCES = new String[] { "test_kylin_ii_inner_join", "test_kylin_ii_left_join" }; + + private static final Log logger = LogFactory.getLog(BuildIIWithEngine.class); + + protected void waitForJob(String jobId) { + while (true) { + AbstractExecutable job = jobService.getJob(jobId); + if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) { + break; + } else { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + @BeforeClass + public static void beforeClass() throws Exception { + logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); + if (System.getProperty("hdp.version") == null) { + throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); + } + } + + @Before + public void before() throws Exception { + HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + + //DeployUtil.initCliWorkDir(); + // DeployUtil.deployMetadata(); + DeployUtil.overrideJobJarLocations(); + + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + jobService = ExecutableManager.getInstance(kylinConfig); + scheduler = DefaultScheduler.getInstance(); + scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); + if (!scheduler.hasStarted()) { + throw new RuntimeException("scheduler has not been started"); + } + jobEngineConfig = new JobEngineConfig(kylinConfig); + for (String jobId : jobService.getAllJobIds()) { + if (jobService.getJob(jobId) instanceof IIJob) { + jobService.deleteJob(jobId); + } + } + + iiManager = IIManager.getInstance(kylinConfig); + for (String iiInstance : TEST_II_INSTANCES) { + + IIInstance ii = iiManager.getII(iiInstance); + if (ii.getStatus() != RealizationStatusEnum.DISABLED) { + ii.setStatus(RealizationStatusEnum.DISABLED); + iiManager.updateII(ii); + } + } + } + + @After + public void after() throws Exception { + + for (String iiInstance : TEST_II_INSTANCES) { + IIInstance ii = iiManager.getII(iiInstance); + if (ii.getStatus() != RealizationStatusEnum.READY) { + ii.setStatus(RealizationStatusEnum.READY); + iiManager.updateII(ii); + } + } + } + + @Test + public void testBuildII() throws Exception { + + String[] testCase = new String[] { "buildIIInnerJoin", "buildIILeftJoin" }; + ExecutorService executorService = Executors.newFixedThreadPool(testCase.length); + final CountDownLatch countDownLatch = new CountDownLatch(testCase.length); + List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length); + for (int i = 0; i < testCase.length; i++) { + tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch))); + } + countDownLatch.await(); + for (int i = 0; i < tasks.size(); ++i) { + Future<List<String>> task = tasks.get(i); + final List<String> jobIds = task.get(); + for (String jobId : jobIds) { + assertJobSucceed(jobId); + } + } + + } + + private void assertJobSucceed(String jobId) { + assertEquals(ExecutableState.SUCCEED, jobService.getOutput(jobId).getState()); + } + + private class TestCallable implements Callable<List<String>> { + + private final String methodName; + private final CountDownLatch countDownLatch; + + public TestCallable(String methodName, CountDownLatch countDownLatch) { + this.methodName = methodName; + this.countDownLatch = countDownLatch; + } + + @SuppressWarnings("unchecked") + @Override + public List<String> call() throws Exception { + try { + final Method method = BuildIIWithEngine.class.getDeclaredMethod(methodName); + method.setAccessible(true); + return (List<String>) method.invoke(BuildIIWithEngine.this); + } finally { + countDownLatch.countDown(); + } + } + } + + protected List<String> buildIIInnerJoin() throws Exception { + return buildII(TEST_II_INSTANCES[0]); + } + + protected List<String> buildIILeftJoin() throws Exception { + return buildII(TEST_II_INSTANCES[1]); + } + + protected List<String> buildII(String iiName) throws Exception { + clearSegment(iiName); + + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); + f.setTimeZone(TimeZone.getTimeZone("GMT")); + + long date1 = 0; + long date2 = f.parse("2015-01-01").getTime(); + + List<String> result = Lists.newArrayList(); + result.add(buildSegment(iiName, date1, date2)); + return result; + } + + private void clearSegment(String iiName) throws Exception { + IIInstance ii = iiManager.getII(iiName); + ii.getSegments().clear(); + iiManager.updateII(ii); + } + + private String buildSegment(String iiName, long startDate, long endDate) throws Exception { + IIInstance iiInstance = iiManager.getII(iiName); + IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate); + iiInstance.getSegments().add(segment); + iiManager.updateII(iiInstance); + + BatchIIJobBuilder batchIIJobBuilder = new BatchIIJobBuilder(segment, "SYSTEM"); + IIJob job = batchIIJobBuilder.build(); + jobService.addJob(job); + waitForJob(job.getId()); + return job.getId(); + } + + private int cleanupOldStorage() throws Exception { + String[] args = { "--delete", "true" }; + + int exitCode = ToolRunner.run(new StorageCleanupJob(), args); + return exitCode; + } + + public static void main(String[] args) throws Exception { + BuildIIWithEngine instance = new BuildIIWithEngine(); + + BuildIIWithEngine.beforeClass(); + instance.before(); + instance.testBuildII(); + instance.after(); + + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7f08dab5/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java new file mode 100644 index 0000000..4552cf0 --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java @@ -0,0 +1,294 @@ +/* + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * + * contributor license agreements. See the NOTICE file distributed with + * + * this work for additional information regarding copyright ownership. + * + * The ASF licenses this file to You under the Apache License, Version 2.0 + * + * (the "License"); you may not use this file except in compliance with + * + * the License. You may obtain a copy of the License at + * + * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and + * + * limitations under the License. + * + * / + */ + +package org.apache.kylin.provision; + +import java.io.File; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.TimeZone; +import java.util.UUID; + +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractKylinTestCase; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.StreamingBatch; +import org.apache.kylin.common.util.StreamingMessage; +import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.IISegment; +import org.apache.kylin.invertedindex.index.Slice; +import org.apache.kylin.invertedindex.index.SliceBuilder; +import org.apache.kylin.invertedindex.model.IIDesc; +import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc; +import org.apache.kylin.invertedindex.model.IIKeyValueCodec; +import org.apache.kylin.invertedindex.model.IIRow; +import org.apache.kylin.job.DeployUtil; +import org.apache.kylin.job.JoinedFlatTable; +import org.apache.kylin.job.common.ShellExecutable; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.RealizationStatusEnum; +import org.apache.kylin.source.hive.HiveTableReader; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.apache.kylin.storage.hbase.ii.IICreateHTableJob; +import org.apache.kylin.storage.hbase.util.StorageCleanupJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class BuildIIWithStream { + + private static final Logger logger = LoggerFactory.getLogger(BuildIIWithStream.class); + + private static final String[] II_NAME = new String[] { "test_kylin_ii_left_join", "test_kylin_ii_inner_join" }; + private IIManager iiManager; + private KylinConfig kylinConfig; + + public static void main(String[] args) throws Exception { + beforeClass(); + BuildIIWithStream buildCubeWithEngine = new BuildIIWithStream(); + buildCubeWithEngine.before(); + buildCubeWithEngine.build(); + afterClass(); + } + + public static void beforeClass() throws Exception { + logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + if (System.getProperty("hdp.version") == null) { + throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); + } + HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + } + + public void before() throws Exception { + DeployUtil.overrideJobJarLocations(); + + kylinConfig = KylinConfig.getInstanceFromEnv(); + iiManager = IIManager.getInstance(kylinConfig); + for (String iiInstance : II_NAME) { + + IIInstance ii = iiManager.getII(iiInstance); + if (ii.getStatus() != RealizationStatusEnum.DISABLED) { + ii.setStatus(RealizationStatusEnum.DISABLED); + iiManager.updateII(ii); + } + } + } + + public static void afterClass() throws Exception { + cleanupOldStorage(); + HBaseMetadataTestCase.staticCleanupTestMetadata(); + } + + private String createIntermediateTable(IIDesc desc, KylinConfig kylinConfig) throws IOException { + IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(desc); + JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig); + final String uuid = UUID.randomUUID().toString(); + final String useDatabaseHql = "USE " + kylinConfig.getHiveDatabaseForIntermediateTable() + ";"; + final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc); + final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, JobBuilderSupport.getJobWorkingDir(jobEngineConfig, uuid)); + String insertDataHqls; + try { + insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobEngineConfig); + } catch (IOException e1) { + e1.printStackTrace(); + throw new RuntimeException("Failed to generate insert data SQL for intermediate table."); + } + + ShellExecutable step = new ShellExecutable(); + StringBuffer buf = new StringBuffer(); + buf.append("hive -e \""); + buf.append(useDatabaseHql + "\n"); + buf.append(dropTableHql + "\n"); + buf.append(createTableHql + "\n"); + buf.append(insertDataHqls + "\n"); + buf.append("\""); + + step.setCmd(buf.toString()); + logger.info(step.getCmd()); + step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); + kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null); + return intermediateTableDesc.getTableName(); + } + + private void clearSegment(String iiName) throws Exception { + IIInstance ii = iiManager.getII(iiName); + ii.getSegments().clear(); + iiManager.updateII(ii); + } + + private IISegment createSegment(String iiName) throws Exception { + clearSegment(iiName); + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); + f.setTimeZone(TimeZone.getTimeZone("GMT")); + + long date1 = 0; + long date2 = f.parse("2015-01-01").getTime(); + return buildSegment(iiName, date1, date2); + } + + private IISegment buildSegment(String iiName, long startDate, long endDate) throws Exception { + IIInstance iiInstance = iiManager.getII(iiName); + IISegment segment = iiManager.buildSegment(iiInstance, startDate, endDate); + iiInstance.getSegments().add(segment); + iiManager.updateII(iiInstance); + return segment; + } + + private void buildII(String iiName) throws Exception { + final IIDesc desc = iiManager.getII(iiName).getDescriptor(); + final String tableName = createIntermediateTable(desc, kylinConfig); + logger.info("intermediate table name:" + tableName); + + HiveTableReader reader = new HiveTableReader("default", tableName); + final List<TblColRef> tblColRefs = desc.listAllColumns(); + for (TblColRef tblColRef : tblColRefs) { + if (desc.isMetricsCol(tblColRef)) { + logger.info("matrix:" + tblColRef.getName()); + } else { + logger.info("measure:" + tblColRef.getName()); + } + } + final IISegment segment = createSegment(iiName); + final HTableInterface htable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(segment.getStorageLocationIdentifier()); + String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() }; + ToolRunner.run(new IICreateHTableJob(), args); + + final IIDesc iiDesc = segment.getIIDesc(); + final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0); + + List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn()); + int count = sorted.size(); + ArrayList<StreamingMessage> messages = Lists.newArrayList(); + for (String[] row : sorted) { + messages.add((parse(row))); + if (messages.size() >= iiDesc.getSliceSize()) { + build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable); + messages.clear(); + } + } + + if (!messages.isEmpty()) { + build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable); + } + + reader.close(); + logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier()); + logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier()); + } + + public void build() throws Exception { + for (String iiName : II_NAME) { + buildII(iiName); + IIInstance ii = iiManager.getII(iiName); + if (ii.getStatus() != RealizationStatusEnum.READY) { + ii.setStatus(RealizationStatusEnum.READY); + iiManager.updateII(ii); + } + } + } + + private void build(SliceBuilder sliceBuilder, StreamingBatch batch, HTableInterface htable) throws IOException { + final Slice slice = sliceBuilder.buildSlice(batch); + try { + loadToHBase(htable, slice, new IIKeyValueCodec(slice.getInfo())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException { + List<Put> data = Lists.newArrayList(); + for (IIRow row : codec.encodeKeyValue(slice)) { + final byte[] key = row.getKey().get(); + final byte[] value = row.getValue().get(); + Put put = new Put(key); + put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value); + final ImmutableBytesWritable dictionary = row.getDictionary(); + final byte[] dictBytes = dictionary.get(); + if (dictionary.getOffset() == 0 && dictionary.getLength() == dictBytes.length) { + put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes); + } else { + throw new RuntimeException("dict offset should be 0, and dict length should be " + dictBytes.length + " but they are" + dictionary.getOffset() + " " + dictionary.getLength()); + } + data.add(put); + } + hTable.put(data); + //omit hTable.flushCommits(), because htable is auto flush + } + + private StreamingMessage parse(String[] row) { + return new StreamingMessage(Lists.newArrayList(row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, Object> emptyMap()); + } + + private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException { + List<String[]> unsorted = Lists.newArrayList(); + while (reader.next()) { + unsorted.add(reader.getRow()); + } + Collections.sort(unsorted, new Comparator<String[]>() { + @Override + public int compare(String[] o1, String[] o2) { + long t1 = DateFormat.stringToMillis(o1[tsCol]); + long t2 = DateFormat.stringToMillis(o2[tsCol]); + return Long.compare(t1, t2); + } + }); + return unsorted; + } + + private static int cleanupOldStorage() throws Exception { + String[] args = { "--delete", "true" }; + + int exitCode = ToolRunner.run(new StorageCleanupJob(), args); + return exitCode; + } + +}