http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/SqlUtil.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/SqlUtil.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/SqlUtil.java
deleted file mode 100644
index ba280df..0000000
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/SqlUtil.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source.hive;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
-import java.util.Random;
-
-import javax.sql.DataSource;
-
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-
-public class SqlUtil {
-    private static final Logger logger = 
LoggerFactory.getLogger(SqlUtil.class);
-
-    public static void closeResources(Connection con, Statement statement){
-        try{
-            if (statement!=null && !statement.isClosed()){
-                statement.close();
-            }
-        }catch(Exception e){
-            logger.error("", e);
-        }
-        
-        try{
-            if (con!=null && !con.isClosed()){
-                con.close();
-            }
-        }catch(Exception e){
-            logger.error("", e);
-        }
-    }
-    
-    
-    public static void execUpdateSQL(String sql, DataSource ds){
-        Connection con = null;
-        try{
-            con = ds.getConnection();
-            execUpdateSQL(con, sql);
-        }catch(Exception e){
-            logger.error("", e);
-        }finally{
-            closeResources(con, null);
-        }
-    }
-    
-    public static void execUpdateSQL(Connection db, String sql){
-        Statement statement=null;
-        try{
-            statement = db.createStatement();
-            statement.executeUpdate(sql);            
-        }catch(Exception e){
-            logger.error("", e);
-        }finally{
-            closeResources(null, statement);
-        }
-    }
-    
-    public static int tryTimes=10;
-    public static Connection getConnection(DBConnConf dbconf){
-        if (dbconf.getUrl()==null)
-            return null;
-        Connection con = null;
-        try {
-            Class.forName(dbconf.getDriver());
-        }catch(Exception e){
-            logger.error("", e);
-        }
-        boolean got=false;
-        int times=0;
-        Random r = new Random();
-        while(!got && times<tryTimes){
-            times++;
-            try {
-                con = DriverManager.getConnection(dbconf.getUrl(), 
dbconf.getUser(), dbconf.getPass());
-                got = true;
-            }catch(Exception e){
-                logger.warn("while use:" + dbconf, e);
-                try {
-                    int rt = r.nextInt(10);
-                    Thread.sleep(rt*1000);
-                } catch (InterruptedException e1) {
-                }
-            }
-        }
-        return con;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
new file mode 100644
index 0000000..63593c0
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class CmdStep extends AbstractExecutable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(CmdStep.class);
+    private final PatternedLogger stepLogger = new PatternedLogger(logger);
+
+    public void setCmd(String cmd) {
+        setParam("cmd", cmd);
+    }
+
+    public CmdStep() {
+    }
+
+    protected void sqoopFlatHiveTable(KylinConfig config) throws IOException {
+        String cmd = getParam("cmd");
+        stepLogger.log(String.format("exe cmd:%s", cmd));
+        Pair<Integer, String> response = 
config.getCliCommandExecutor().execute(cmd, stepLogger);
+        getManager().addJobInfo(getId(), stepLogger.getInfo());
+        if (response.getFirst() != 0) {
+            throw new RuntimeException("Failed to create flat hive table, 
error code " + response.getFirst());
+        }
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        try {
+            sqoopFlatHiveTable(config);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, 
stepLogger.getBufferedLog());
+
+        } catch (Exception e) {
+            logger.error("job:" + getId() + " execute finished with 
exception", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, 
stepLogger.getBufferedLog());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
new file mode 100644
index 0000000..8a6c90f
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HiveCmdBuilder;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class HiveCmdStep extends AbstractExecutable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HiveCmdStep.class);
+    private final PatternedLogger stepLogger = new PatternedLogger(logger);
+
+    protected void createFlatHiveTable(KylinConfig config) throws IOException {
+        final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+        hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride());
+        hiveCmdBuilder.addStatement(getCmd());
+        final String cmd = hiveCmdBuilder.toString();
+
+        stepLogger.log("cmd: ");
+        stepLogger.log(cmd);
+
+        Pair<Integer, String> response = 
config.getCliCommandExecutor().execute(cmd, stepLogger);
+        getManager().addJobInfo(getId(), stepLogger.getInfo());
+        if (response.getFirst() != 0) {
+            throw new RuntimeException("Failed to create flat hive table, 
error code " + response.getFirst());
+        }
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        try {
+            createFlatHiveTable(config);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, 
stepLogger.getBufferedLog());
+
+        } catch (Exception e) {
+            logger.error("job:" + getId() + " execute finished with 
exception", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, 
stepLogger.getBufferedLog());
+        }
+    }
+
+    public void setCmd(String sql) {
+        setParam("cmd", sql);
+    }
+
+    public String getCmd() {
+        return getParam("cmd");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
new file mode 100644
index 0000000..25eacfc
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.source.ISampleDataDeployer;
+import org.apache.kylin.source.ISourceMetadataExplorer;
+import org.apache.kylin.source.hive.DBConnConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JdbcExplorer implements ISourceMetadataExplorer, 
ISampleDataDeployer {
+    private static final Logger logger = 
LoggerFactory.getLogger(JdbcExplorer.class);
+    
+    public static final String DIALECT_VERTICA="vertica";
+    public static final String DIALECT_ORACLE="oracle";
+    public static final String DIALECT_MYSQL="mysql";
+    public static final String DIALECT_HIVE="hive";
+    
+    public static final String TABLE_TYPE_TABLE="TABLE";
+    public static final String TABLE_TYPE_VIEW="VIEW";
+    
+    private KylinConfig config;
+    private DBConnConf dbconf;
+    private String dialect;
+
+    public JdbcExplorer() {
+        config = KylinConfig.getInstanceFromEnv();
+        String connectionUrl = config.getJdbcConnectionUrl();
+        String driverClass = config.getJdbcDriver();
+        String jdbcUser = config.getJdbcUser();
+        String jdbcPass = config.getJdbcPass();
+        dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, 
jdbcPass);
+        this.dialect = config.getJdbcDialect();
+    }
+    
+    private String getSqlDataType(String javaDataType) {
+        if (DIALECT_VERTICA.equals(dialect)){
+            if (javaDataType.toLowerCase().equals("double")){
+                return "float";
+            }
+        }
+
+        return javaDataType.toLowerCase();
+    }
+    
+    @Override
+    public void createSampleDatabase(String database) throws Exception {
+        executeSQL(generateCreateSchemaSql(database));
+    }
+
+    private String generateCreateSchemaSql(String schemaName){
+        if (DIALECT_VERTICA.equals(dialect)){
+            return String.format("CREATE schema IF NOT EXISTS %s", schemaName);
+        }else{
+            logger.error(String.format("unsupported dialect %s.", dialect));
+            return null;
+        }
+    }
+    
+    @Override
+    public void loadSampleData(String tableName, String tmpDataDir) throws 
Exception {
+        executeSQL(generateLoadDataSql(tableName, tmpDataDir));
+    }
+
+    private String generateLoadDataSql(String tableName, String tableFileDir) {
+        if (DIALECT_VERTICA.equals(dialect)){
+            return String.format("copy %s from local '%s/%s.csv' delimiter as 
',';", tableName, tableFileDir, tableName);
+        }else{
+            logger.error(String.format("unsupported dialect %s.", dialect));
+            return null;
+        }
+    }
+
+    @Override
+    public void createSampleTable(TableDesc table) throws Exception {
+        executeSQL(generateCreateTableSql(table));
+    }
+
+    private String[] generateCreateTableSql(TableDesc tableDesc) {
+        logger.info(String.format("gen create table sql:%s", tableDesc));
+        String tableIdentity = String.format("%s.%s", 
tableDesc.getDatabase().toUpperCase(), tableDesc.getName()).toUpperCase();
+        String dropsql = "DROP TABLE IF EXISTS " + tableIdentity;
+        String dropsql2 = "DROP VIEW IF EXISTS " + tableIdentity;
+
+        StringBuilder ddl = new StringBuilder();
+        ddl.append("CREATE TABLE " + tableIdentity + "\n");
+        ddl.append("(" + "\n");
+
+        for (int i = 0; i < tableDesc.getColumns().length; i++) {
+            ColumnDesc col = tableDesc.getColumns()[i];
+            if (i > 0) {
+                ddl.append(",");
+            }
+            ddl.append(col.getName() + " " + 
getSqlDataType((col.getDatatype())) + "\n");
+        }
+
+        ddl.append(")");
+
+        return new String[] { dropsql, dropsql2, ddl.toString() };
+    }
+
+    @Override
+    public void createWrapperView(String origTableName, String viewName) 
throws Exception {
+        executeSQL(generateCreateViewSql(viewName, origTableName));
+    }
+
+    private String[] generateCreateViewSql(String viewName, String tableName) {
+
+        String dropView = "DROP VIEW IF EXISTS " + viewName;
+        String dropTable = "DROP TABLE IF EXISTS " + viewName;
+
+        String createSql = ("CREATE VIEW " + viewName + " AS SELECT * FROM " + 
tableName);
+
+        return new String[] { dropView, dropTable, createSql };
+    }
+
+    private void executeSQL(String sql) throws CommandNeedRetryException, 
IOException {
+        Connection con = SqlUtil.getConnection(dbconf);
+        logger.info(String.format(sql));
+        SqlUtil.execUpdateSQL(con, sql);
+        SqlUtil.closeResources(con, null);
+    }
+
+    private void executeSQL(String[] sqls) throws CommandNeedRetryException, 
IOException {
+        Connection con = SqlUtil.getConnection(dbconf);
+        for (String sql : sqls){
+            logger.info(String.format(sql));
+            SqlUtil.execUpdateSQL(con, sql);
+        }
+        SqlUtil.closeResources(con, null);
+    }
+
+    @Override
+    public List<String> listDatabases() throws Exception {
+        Connection con = SqlUtil.getConnection(dbconf);
+        DatabaseMetaData dbmd = con.getMetaData();
+        ResultSet rs = dbmd.getSchemas();
+        List<String> ret = new ArrayList<String>();
+        /*
+        The schema columns are: 
+            - TABLE_SCHEM String => schema name 
+            - TABLE_CATALOG String => catalog name (may be null) 
+        */
+        while (rs.next()){
+            String schema = rs.getString(1);
+            String catalog = rs.getString(2);
+            logger.info(String.format("%s,%s", schema, catalog));
+            ret.add(schema);
+        }
+        SqlUtil.closeResources(con, null);
+        return ret;
+    }
+
+    @Override
+    public List<String> listTables(String database) throws Exception {
+        Connection con = SqlUtil.getConnection(dbconf);
+        DatabaseMetaData dbmd = con.getMetaData();
+        ResultSet rs = dbmd.getTables(null, database, null, null);
+        List<String> ret = new ArrayList<String>();
+        /*
+    - TABLE_CAT String => table catalog (may be null) 
+    - TABLE_SCHEM String => table schema (may be null) 
+    - TABLE_NAME String => table name 
+    - TABLE_TYPE String => table type. Typical types are "TABLE", "VIEW", 
"SYSTEM TABLE", "GLOBAL 
+     TEMPORARY", "LOCAL TEMPORARY", "ALIAS", "SYNONYM". 
+    - REMARKS String => explanatory comment on the table 
+    - TYPE_CAT String => the types catalog (may be null) 
+    - TYPE_SCHEM String => the types schema (may be null) 
+    - TYPE_NAME String => type name (may be null) 
+    - SELF_REFERENCING_COL_NAME String => name of the designated "identifier" 
column of a typed 
+     table (may be null) 
+    - REF_GENERATION String => specifies how values in 
SELF_REFERENCING_COL_NAME are created. 
+     Values are "SYSTEM", "USER", "DERIVED". (may be null) 
+         */
+        while (rs.next()){
+            String catalog = rs.getString(1);
+            String schema = rs.getString(2);
+            String name = rs.getString(3);
+            String type = rs.getString(4);
+            logger.info(String.format("%s,%s,%s,%s", schema, catalog, name, 
type));
+            ret.add(name);
+        }
+        SqlUtil.closeResources(con, null);
+        return ret;
+    }
+
+    @Override
+    public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, 
String table) throws Exception {
+
+        TableDesc tableDesc = new TableDesc();
+        tableDesc.setDatabase(database.toUpperCase());
+        tableDesc.setName(table.toUpperCase());
+        tableDesc.setUuid(UUID.randomUUID().toString());
+        tableDesc.setLastModified(0);
+        
+        Connection con = SqlUtil.getConnection(dbconf);
+        DatabaseMetaData dbmd = con.getMetaData();
+        ResultSet rs = dbmd.getTables(null, database, table, null);
+        String tableType=null;
+        while (rs.next()){
+            tableType = rs.getString(4);
+        }
+        if (tableType!=null){
+            tableDesc.setTableType(tableType);
+        }else{
+            logger.error(String.format("table %s not found in schema:%s", 
table, database));
+        }
+        /*
+    - 1. TABLE_CAT String => table catalog (may be null) 
+    - 2. TABLE_SCHEM String => table schema (may be null) 
+    - 3. TABLE_NAME String => table name 
+    - 4. COLUMN_NAME String => column name 
+    - 5. DATA_TYPE int => SQL type from java.sql.Types 
+    - 6. TYPE_NAME String => Data source dependent type name, for a UDT the 
type name is fully qualified 
+    - 7. COLUMN_SIZE int => column size. 
+    - 8. BUFFER_LENGTH is not used. 
+    - 9. DECIMAL_DIGITS int => the number of fractional digits. Null is 
returned for data types where DECIMAL_DIGITS is not applicable. 
+    - 10.NUM_PREC_RADIX int => Radix (typically either 10 or 2) 
+    - 11.NULLABLE int => is NULL allowed. 
+        - columnNoNulls - might not allow NULL values 
+        - columnNullable - definitely allows NULL values 
+        - columnNullableUnknown - nullability unknown 
+    - 12.REMARKS String => comment describing column (may be null) 
+    - 13.COLUMN_DEF String => default value for the column, which should be 
interpreted as a string when the value is enclosed in single quotes (may be 
null) 
+    - 14.SQL_DATA_TYPE int => unused 
+    - 15.SQL_DATETIME_SUB int => unused 
+    - 16.CHAR_OCTET_LENGTH int => for char types the maximum number of bytes 
in the column 
+    - 17.ORDINAL_POSITION int => index of column in table (starting at 1) 
+    - 18.IS_NULLABLE String => ISO rules are used to determine the nullability 
for a column. 
+        - YES --- if the column can include NULLs 
+        - NO --- if the column cannot include NULLs 
+        - empty string --- if the nullability for the column is unknown
+         */
+        List<ColumnDesc> columns = new ArrayList<ColumnDesc>();
+        rs = dbmd.getColumns(null, database, table, null);
+        while (rs.next()){
+            String tname = rs.getString(3);
+            String cname = rs.getString(4);
+            int type=rs.getInt(5);
+            String typeName=rs.getString(6);
+            int csize=rs.getInt(7);
+            int digits = rs.getInt(9);
+            int nullable = rs.getInt(11);
+            String comment = rs.getString(12);
+            int pos = rs.getInt(17);
+            logger.info(String.format("%s,%s,%d,%d,%d,%d,%s,%d", tname, cname, 
type, csize, digits, nullable, comment, pos));
+            
+            ColumnDesc cdesc = new ColumnDesc();
+            cdesc.setName(cname.toUpperCase());
+            // use "double" in kylin for "float"
+            cdesc.setDatatype(typeName);
+            cdesc.setId(String.valueOf(pos));
+            columns.add(cdesc);
+        }
+        
+        
+        tableDesc.setColumns(columns.toArray(new ColumnDesc[columns.size()]));
+
+        TableExtDesc tableExtDesc = new TableExtDesc();
+        tableExtDesc.setName(table);
+        tableExtDesc.setUuid(UUID.randomUUID().toString());
+        tableExtDesc.setLastModified(0);
+        tableExtDesc.init();
+
+        return Pair.newPair(tableDesc, tableExtDesc);
+    }
+
+    @Override
+    public List<String> getRelatedKylinResources(TableDesc table) {
+        return Collections.emptyList();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
new file mode 100644
index 0000000..ddd38db
--- /dev/null
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.jdbc;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.source.hive.HiveMRInput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcHiveMRInput extends HiveMRInput {
+    
+    private static final Logger logger = 
LoggerFactory.getLogger(JdbcHiveMRInput.class);
+    
+    public IMRBatchCubingInputSide 
getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        return new BatchCubingInputSide(flatDesc);
+    }
+
+    public static class BatchCubingInputSide extends 
HiveMRInput.BatchCubingInputSide {
+        
+        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            super(flatDesc);
+        }
+
+        @Override
+        protected void 
addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
+            final String cubeName = 
CubingExecutableUtil.getCubeName(jobFlow.getParams());
+            final String hiveInitStatements = 
JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+            final String jobWorkingDir = getJobWorkingDir(jobFlow);
+            
+            jobFlow.addTask(createSqoopToFlatHiveStep(jobWorkingDir, 
cubeName));
+            jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, 
jobWorkingDir));
+        }
+
+        private AbstractExecutable createFlatHiveTableFromFiles(String 
hiveInitStatements, String jobWorkingDir) {
+            final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(flatDesc);
+            final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir, 
"TEXTFILE");
+            
+            HiveCmdStep step = new HiveCmdStep();
+            step.setCmd(hiveInitStatements + dropTableHql + createTableHql);
+            return step;
+        }
+        
+        private AbstractExecutable createSqoopToFlatHiveStep(String 
jobWorkingDir, String cubeName) {
+            KylinConfig config = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig();
+            String partCol = 
flatDesc.getDataModel().getPartitionDesc().getPartitionDateColumn();//tablename.colname
+            //using sqoop to extract data from jdbc source and dump them to 
hive
+            String selectSql = 
JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new 
String[]{partCol});
+            String hiveTable = flatDesc.getTableName();
+            String connectionUrl = config.getJdbcConnectionUrl();
+            String driverClass = config.getJdbcDriver();
+            String jdbcUser = config.getJdbcUser();
+            String jdbcPass = config.getJdbcPass();
+            String sqoopHome = config.getSqoopHome();
+            String cmd= String.format(String.format("%s/sqoop import "
+                    + "--connect %s --driver %s --username %s --password %s 
--query \"%s AND \\$CONDITIONS\" "
+                    + "--target-dir %s/%s --split-by %s", sqoopHome, 
connectionUrl, driverClass, jdbcUser, 
+                    jdbcPass, selectSql, jobWorkingDir, hiveTable, partCol));
+            logger.info(String.format("sqoop cmd:%s", cmd));
+            CmdStep step = new CmdStep();
+            step.setCmd(cmd);
+            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+            return step;
+        }
+        
+        @Override
+        protected void 
addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
+            // skip
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
new file mode 100644
index 0000000..cd66837
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.jdbc;
+
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.ISampleDataDeployer;
+import org.apache.kylin.source.ISource;
+import org.apache.kylin.source.ISourceMetadataExplorer;
+import org.apache.kylin.source.SourcePartition;
+
+//used by reflection
+public class JdbcSource implements ISource {
+
+    @Override
+    public ISourceMetadataExplorer getSourceMetadataExplorer() {
+        return new JdbcExplorer();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+        if (engineInterface == IMRInput.class) {
+            return (I) new JdbcHiveMRInput();
+        } else {
+            throw new RuntimeException("Cannot adapt to " + engineInterface);
+        }
+    }
+
+    @Override
+    public IReadableTable createReadableTable(TableDesc tableDesc) {
+        return new JdbcTable(tableDesc);
+    }
+
+    @Override
+    public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable 
buildable, SourcePartition srcPartition) {
+        SourcePartition result = SourcePartition.getCopyOf(srcPartition);
+        result.setStartOffset(0);
+        result.setEndOffset(0);
+        return result;
+    }
+
+    @Override
+    public ISampleDataDeployer getSampleDataDeployer() {
+        return new JdbcExplorer();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTable.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTable.java 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTable.java
new file mode 100644
index 0000000..4313862
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTable.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.jdbc;
+
+import java.io.IOException;
+
+
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class JdbcTable implements IReadableTable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(JdbcTable.class);
+
+    final private String database;
+    final private String tableName;
+
+
+    public JdbcTable(TableDesc tableDesc) {
+        this.database = tableDesc.getDatabase();
+        this.tableName = tableDesc.getName();
+    }
+
+    @Override
+    public TableReader getReader() throws IOException {
+        return new JdbcTableReader(database, tableName);
+    }
+
+    @Override
+    public TableSignature getSignature() throws IOException {
+        String path = String.format("%s.%s", database, tableName);
+        long lastModified = System.currentTimeMillis(); // assume table is 
ever changing
+        int size=0;
+        return new TableSignature(path, size, lastModified);
+    }
+    
+    @Override
+    public boolean exists() {
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "database=[" + database + "], table=[" + tableName + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
new file mode 100644
index 0000000..b8865d6
--- /dev/null
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.source.IReadableTable.TableReader;
+import org.apache.kylin.source.hive.DBConnConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of TableReader with HCatalog for Hive table.
+ */
+public class JdbcTableReader implements TableReader {
+    private static final Logger logger = 
LoggerFactory.getLogger(JdbcTableReader.class);
+    
+    private String dbName;
+    private String tableName;
+
+    private DBConnConf dbconf;
+    private String dialect;
+    private Connection jdbcCon;
+    private Statement statement;
+    private ResultSet rs;
+    private int colCount;
+
+    /**
+     * Constructor for reading whole hive table
+     * @param dbName
+     * @param tableName
+     * @throws IOException
+     */
+    public JdbcTableReader(String dbName, String tableName) throws IOException 
{
+        this.dbName = dbName;
+        this.tableName = tableName;
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        String connectionUrl = config.getJdbcConnectionUrl();
+        String driverClass = config.getJdbcDriver();
+        String jdbcUser = config.getJdbcUser();
+        String jdbcPass = config.getJdbcPass();
+        dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, 
jdbcPass);
+        this.dialect = config.getJdbcDialect();
+        jdbcCon = SqlUtil.getConnection(dbconf);
+        String sql = String.format("select * from %s.%s", dbName, tableName);
+        try {
+            statement = jdbcCon.createStatement();
+            rs = statement.executeQuery(sql);
+            colCount = rs.getMetaData().getColumnCount();
+        }catch(SQLException e){
+            throw new IOException(String.format("error while exec %s", sql), 
e);
+        }
+        
+    }
+
+    @Override
+    public boolean next() throws IOException {
+        try {
+            return rs.next();
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public String[] getRow() {
+        String[] ret = new String[colCount];
+        for (int i=1; i<=colCount; i++){
+            try {
+                Object o = rs.getObject(i);
+                ret[i-1] = (o == null? null:o.toString());
+            }catch(Exception e){
+                logger.error("", e);
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public void close() throws IOException {
+        SqlUtil.closeResources(jdbcCon, statement);
+    }
+
+    public String toString() {
+        return "jdbc table reader for: " + dbName + "." + tableName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
new file mode 100644
index 0000000..a112d87
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.Random;
+
+import javax.sql.DataSource;
+
+import org.slf4j.LoggerFactory;
+import org.apache.kylin.source.hive.DBConnConf;
+import org.slf4j.Logger;
+
+public class SqlUtil {
+    private static final Logger logger = 
LoggerFactory.getLogger(SqlUtil.class);
+
+    public static void closeResources(Connection con, Statement statement){
+        try{
+            if (statement!=null && !statement.isClosed()){
+                statement.close();
+            }
+        }catch(Exception e){
+            logger.error("", e);
+        }
+        
+        try{
+            if (con!=null && !con.isClosed()){
+                con.close();
+            }
+        }catch(Exception e){
+            logger.error("", e);
+        }
+    }
+    
+    
+    public static void execUpdateSQL(String sql, DataSource ds){
+        Connection con = null;
+        try{
+            con = ds.getConnection();
+            execUpdateSQL(con, sql);
+        }catch(Exception e){
+            logger.error("", e);
+        }finally{
+            closeResources(con, null);
+        }
+    }
+    
+    public static void execUpdateSQL(Connection db, String sql){
+        Statement statement=null;
+        try{
+            statement = db.createStatement();
+            statement.executeUpdate(sql);            
+        }catch(Exception e){
+            logger.error("", e);
+        }finally{
+            closeResources(null, statement);
+        }
+    }
+    
+    public static int tryTimes=10;
+    public static Connection getConnection(DBConnConf dbconf){
+        if (dbconf.getUrl()==null)
+            return null;
+        Connection con = null;
+        try {
+            Class.forName(dbconf.getDriver());
+        }catch(Exception e){
+            logger.error("", e);
+        }
+        boolean got=false;
+        int times=0;
+        Random r = new Random();
+        while(!got && times<tryTimes){
+            times++;
+            try {
+                con = DriverManager.getConnection(dbconf.getUrl(), 
dbconf.getUser(), dbconf.getPass());
+                got = true;
+            }catch(Exception e){
+                logger.warn("while use:" + dbconf, e);
+                try {
+                    int rt = r.nextInt(10);
+                    Thread.sleep(rt*1000);
+                } catch (InterruptedException e1) {
+                }
+            }
+        }
+        return con;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 52d2e6f..7425dae 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -32,8 +32,9 @@ import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.streaming.StreamingConfig;
-import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.ISampleDataDeployer;
+import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -209,4 +210,9 @@ public class KafkaSource implements ISource {
         };
     }
 
+    @Override
+    public ISampleDataDeployer getSampleDataDeployer() {
+        throw new UnsupportedOperationException();
+    }
+
 }

Reply via email to