http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/SqlUtil.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/SqlUtil.java b/source-hive/src/main/java/org/apache/kylin/source/hive/SqlUtil.java deleted file mode 100644 index ba280df..0000000 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/SqlUtil.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.source.hive; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.Statement; -import java.util.Random; - -import javax.sql.DataSource; - -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; - -public class SqlUtil { - private static final Logger logger = LoggerFactory.getLogger(SqlUtil.class); - - public static void closeResources(Connection con, Statement statement){ - try{ - if (statement!=null && !statement.isClosed()){ - statement.close(); - } - }catch(Exception e){ - logger.error("", e); - } - - try{ - if (con!=null && !con.isClosed()){ - con.close(); - } - }catch(Exception e){ - logger.error("", e); - } - } - - - public static void execUpdateSQL(String sql, DataSource ds){ - Connection con = null; - try{ - con = ds.getConnection(); - execUpdateSQL(con, sql); - }catch(Exception e){ - logger.error("", e); - }finally{ - closeResources(con, null); - } - } - - public static void execUpdateSQL(Connection db, String sql){ - Statement statement=null; - try{ - statement = db.createStatement(); - statement.executeUpdate(sql); - }catch(Exception e){ - logger.error("", e); - }finally{ - closeResources(null, statement); - } - } - - public static int tryTimes=10; - public static Connection getConnection(DBConnConf dbconf){ - if (dbconf.getUrl()==null) - return null; - Connection con = null; - try { - Class.forName(dbconf.getDriver()); - }catch(Exception e){ - logger.error("", e); - } - boolean got=false; - int times=0; - Random r = new Random(); - while(!got && times<tryTimes){ - times++; - try { - con = DriverManager.getConnection(dbconf.getUrl(), dbconf.getUser(), dbconf.getPass()); - got = true; - }catch(Exception e){ - logger.warn("while use:" + dbconf, e); - try { - int rt = r.nextInt(10); - Thread.sleep(rt*1000); - } catch (InterruptedException e1) { - } - } - } - return con; - } -}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java new file mode 100644 index 0000000..63593c0 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/CmdStep.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.jdbc; + +import java.io.IOException; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.job.common.PatternedLogger; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class CmdStep extends AbstractExecutable { + + private static final Logger logger = LoggerFactory.getLogger(CmdStep.class); + private final PatternedLogger stepLogger = new PatternedLogger(logger); + + public void setCmd(String cmd) { + setParam("cmd", cmd); + } + + public CmdStep() { + } + + protected void sqoopFlatHiveTable(KylinConfig config) throws IOException { + String cmd = getParam("cmd"); + stepLogger.log(String.format("exe cmd:%s", cmd)); + Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger); + getManager().addJobInfo(getId(), stepLogger.getInfo()); + if (response.getFirst() != 0) { + throw new RuntimeException("Failed to create flat hive table, error code " + response.getFirst()); + } + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + try { + sqoopFlatHiveTable(config); + return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); + + } catch (Exception e) { + logger.error("job:" + getId() + " execute finished with exception", e); + return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java new file mode 100644 index 0000000..8a6c90f --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/HiveCmdStep.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.jdbc; + +import java.io.IOException; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HiveCmdBuilder; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.job.common.PatternedLogger; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class HiveCmdStep extends AbstractExecutable { + + private static final Logger logger = LoggerFactory.getLogger(HiveCmdStep.class); + private final PatternedLogger stepLogger = new PatternedLogger(logger); + + protected void createFlatHiveTable(KylinConfig config) throws IOException { + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride()); + hiveCmdBuilder.addStatement(getCmd()); + final String cmd = hiveCmdBuilder.toString(); + + stepLogger.log("cmd: "); + stepLogger.log(cmd); + + Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger); + getManager().addJobInfo(getId(), stepLogger.getInfo()); + if (response.getFirst() != 0) { + throw new RuntimeException("Failed to create flat hive table, error code " + response.getFirst()); + } + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + try { + createFlatHiveTable(config); + return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); + + } catch (Exception e) { + logger.error("job:" + getId() + " execute finished with exception", e); + return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); + } + } + + public void setCmd(String sql) { + setParam("cmd", sql); + } + + public String getCmd() { + return getParam("cmd"); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java new file mode 100644 index 0000000..25eacfc --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.jdbc; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; +import org.apache.kylin.source.ISampleDataDeployer; +import org.apache.kylin.source.ISourceMetadataExplorer; +import org.apache.kylin.source.hive.DBConnConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeployer { + private static final Logger logger = LoggerFactory.getLogger(JdbcExplorer.class); + + public static final String DIALECT_VERTICA="vertica"; + public static final String DIALECT_ORACLE="oracle"; + public static final String DIALECT_MYSQL="mysql"; + public static final String DIALECT_HIVE="hive"; + + public static final String TABLE_TYPE_TABLE="TABLE"; + public static final String TABLE_TYPE_VIEW="VIEW"; + + private KylinConfig config; + private DBConnConf dbconf; + private String dialect; + + public JdbcExplorer() { + config = KylinConfig.getInstanceFromEnv(); + String connectionUrl = config.getJdbcConnectionUrl(); + String driverClass = config.getJdbcDriver(); + String jdbcUser = config.getJdbcUser(); + String jdbcPass = config.getJdbcPass(); + dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass); + this.dialect = config.getJdbcDialect(); + } + + private String getSqlDataType(String javaDataType) { + if (DIALECT_VERTICA.equals(dialect)){ + if (javaDataType.toLowerCase().equals("double")){ + return "float"; + } + } + + return javaDataType.toLowerCase(); + } + + @Override + public void createSampleDatabase(String database) throws Exception { + executeSQL(generateCreateSchemaSql(database)); + } + + private String generateCreateSchemaSql(String schemaName){ + if (DIALECT_VERTICA.equals(dialect)){ + return String.format("CREATE schema IF NOT EXISTS %s", schemaName); + }else{ + logger.error(String.format("unsupported dialect %s.", dialect)); + return null; + } + } + + @Override + public void loadSampleData(String tableName, String tmpDataDir) throws Exception { + executeSQL(generateLoadDataSql(tableName, tmpDataDir)); + } + + private String generateLoadDataSql(String tableName, String tableFileDir) { + if (DIALECT_VERTICA.equals(dialect)){ + return String.format("copy %s from local '%s/%s.csv' delimiter as ',';", tableName, tableFileDir, tableName); + }else{ + logger.error(String.format("unsupported dialect %s.", dialect)); + return null; + } + } + + @Override + public void createSampleTable(TableDesc table) throws Exception { + executeSQL(generateCreateTableSql(table)); + } + + private String[] generateCreateTableSql(TableDesc tableDesc) { + logger.info(String.format("gen create table sql:%s", tableDesc)); + String tableIdentity = String.format("%s.%s", tableDesc.getDatabase().toUpperCase(), tableDesc.getName()).toUpperCase(); + String dropsql = "DROP TABLE IF EXISTS " + tableIdentity; + String dropsql2 = "DROP VIEW IF EXISTS " + tableIdentity; + + StringBuilder ddl = new StringBuilder(); + ddl.append("CREATE TABLE " + tableIdentity + "\n"); + ddl.append("(" + "\n"); + + for (int i = 0; i < tableDesc.getColumns().length; i++) { + ColumnDesc col = tableDesc.getColumns()[i]; + if (i > 0) { + ddl.append(","); + } + ddl.append(col.getName() + " " + getSqlDataType((col.getDatatype())) + "\n"); + } + + ddl.append(")"); + + return new String[] { dropsql, dropsql2, ddl.toString() }; + } + + @Override + public void createWrapperView(String origTableName, String viewName) throws Exception { + executeSQL(generateCreateViewSql(viewName, origTableName)); + } + + private String[] generateCreateViewSql(String viewName, String tableName) { + + String dropView = "DROP VIEW IF EXISTS " + viewName; + String dropTable = "DROP TABLE IF EXISTS " + viewName; + + String createSql = ("CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName); + + return new String[] { dropView, dropTable, createSql }; + } + + private void executeSQL(String sql) throws CommandNeedRetryException, IOException { + Connection con = SqlUtil.getConnection(dbconf); + logger.info(String.format(sql)); + SqlUtil.execUpdateSQL(con, sql); + SqlUtil.closeResources(con, null); + } + + private void executeSQL(String[] sqls) throws CommandNeedRetryException, IOException { + Connection con = SqlUtil.getConnection(dbconf); + for (String sql : sqls){ + logger.info(String.format(sql)); + SqlUtil.execUpdateSQL(con, sql); + } + SqlUtil.closeResources(con, null); + } + + @Override + public List<String> listDatabases() throws Exception { + Connection con = SqlUtil.getConnection(dbconf); + DatabaseMetaData dbmd = con.getMetaData(); + ResultSet rs = dbmd.getSchemas(); + List<String> ret = new ArrayList<String>(); + /* + The schema columns are: + - TABLE_SCHEM String => schema name + - TABLE_CATALOG String => catalog name (may be null) + */ + while (rs.next()){ + String schema = rs.getString(1); + String catalog = rs.getString(2); + logger.info(String.format("%s,%s", schema, catalog)); + ret.add(schema); + } + SqlUtil.closeResources(con, null); + return ret; + } + + @Override + public List<String> listTables(String database) throws Exception { + Connection con = SqlUtil.getConnection(dbconf); + DatabaseMetaData dbmd = con.getMetaData(); + ResultSet rs = dbmd.getTables(null, database, null, null); + List<String> ret = new ArrayList<String>(); + /* + - TABLE_CAT String => table catalog (may be null) + - TABLE_SCHEM String => table schema (may be null) + - TABLE_NAME String => table name + - TABLE_TYPE String => table type. Typical types are "TABLE", "VIEW", "SYSTEM TABLE", "GLOBAL + TEMPORARY", "LOCAL TEMPORARY", "ALIAS", "SYNONYM". + - REMARKS String => explanatory comment on the table + - TYPE_CAT String => the types catalog (may be null) + - TYPE_SCHEM String => the types schema (may be null) + - TYPE_NAME String => type name (may be null) + - SELF_REFERENCING_COL_NAME String => name of the designated "identifier" column of a typed + table (may be null) + - REF_GENERATION String => specifies how values in SELF_REFERENCING_COL_NAME are created. + Values are "SYSTEM", "USER", "DERIVED". (may be null) + */ + while (rs.next()){ + String catalog = rs.getString(1); + String schema = rs.getString(2); + String name = rs.getString(3); + String type = rs.getString(4); + logger.info(String.format("%s,%s,%s,%s", schema, catalog, name, type)); + ret.add(name); + } + SqlUtil.closeResources(con, null); + return ret; + } + + @Override + public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table) throws Exception { + + TableDesc tableDesc = new TableDesc(); + tableDesc.setDatabase(database.toUpperCase()); + tableDesc.setName(table.toUpperCase()); + tableDesc.setUuid(UUID.randomUUID().toString()); + tableDesc.setLastModified(0); + + Connection con = SqlUtil.getConnection(dbconf); + DatabaseMetaData dbmd = con.getMetaData(); + ResultSet rs = dbmd.getTables(null, database, table, null); + String tableType=null; + while (rs.next()){ + tableType = rs.getString(4); + } + if (tableType!=null){ + tableDesc.setTableType(tableType); + }else{ + logger.error(String.format("table %s not found in schema:%s", table, database)); + } + /* + - 1. TABLE_CAT String => table catalog (may be null) + - 2. TABLE_SCHEM String => table schema (may be null) + - 3. TABLE_NAME String => table name + - 4. COLUMN_NAME String => column name + - 5. DATA_TYPE int => SQL type from java.sql.Types + - 6. TYPE_NAME String => Data source dependent type name, for a UDT the type name is fully qualified + - 7. COLUMN_SIZE int => column size. + - 8. BUFFER_LENGTH is not used. + - 9. DECIMAL_DIGITS int => the number of fractional digits. Null is returned for data types where DECIMAL_DIGITS is not applicable. + - 10.NUM_PREC_RADIX int => Radix (typically either 10 or 2) + - 11.NULLABLE int => is NULL allowed. + - columnNoNulls - might not allow NULL values + - columnNullable - definitely allows NULL values + - columnNullableUnknown - nullability unknown + - 12.REMARKS String => comment describing column (may be null) + - 13.COLUMN_DEF String => default value for the column, which should be interpreted as a string when the value is enclosed in single quotes (may be null) + - 14.SQL_DATA_TYPE int => unused + - 15.SQL_DATETIME_SUB int => unused + - 16.CHAR_OCTET_LENGTH int => for char types the maximum number of bytes in the column + - 17.ORDINAL_POSITION int => index of column in table (starting at 1) + - 18.IS_NULLABLE String => ISO rules are used to determine the nullability for a column. + - YES --- if the column can include NULLs + - NO --- if the column cannot include NULLs + - empty string --- if the nullability for the column is unknown + */ + List<ColumnDesc> columns = new ArrayList<ColumnDesc>(); + rs = dbmd.getColumns(null, database, table, null); + while (rs.next()){ + String tname = rs.getString(3); + String cname = rs.getString(4); + int type=rs.getInt(5); + String typeName=rs.getString(6); + int csize=rs.getInt(7); + int digits = rs.getInt(9); + int nullable = rs.getInt(11); + String comment = rs.getString(12); + int pos = rs.getInt(17); + logger.info(String.format("%s,%s,%d,%d,%d,%d,%s,%d", tname, cname, type, csize, digits, nullable, comment, pos)); + + ColumnDesc cdesc = new ColumnDesc(); + cdesc.setName(cname.toUpperCase()); + // use "double" in kylin for "float" + cdesc.setDatatype(typeName); + cdesc.setId(String.valueOf(pos)); + columns.add(cdesc); + } + + + tableDesc.setColumns(columns.toArray(new ColumnDesc[columns.size()])); + + TableExtDesc tableExtDesc = new TableExtDesc(); + tableExtDesc.setName(table); + tableExtDesc.setUuid(UUID.randomUUID().toString()); + tableExtDesc.setLastModified(0); + tableExtDesc.init(); + + return Pair.newPair(tableDesc, tableExtDesc); + } + + @Override + public List<String> getRelatedKylinResources(TableDesc table) { + return Collections.emptyList(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java new file mode 100644 index 0000000..ddd38db --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.jdbc; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.JoinedFlatTable; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.source.hive.HiveMRInput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JdbcHiveMRInput extends HiveMRInput { + + private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class); + + public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { + return new BatchCubingInputSide(flatDesc); + } + + public static class BatchCubingInputSide extends HiveMRInput.BatchCubingInputSide { + + public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { + super(flatDesc); + } + + @Override + protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) { + final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); + final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); + final String jobWorkingDir = getJobWorkingDir(jobFlow); + + jobFlow.addTask(createSqoopToFlatHiveStep(jobWorkingDir, cubeName)); + jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, jobWorkingDir)); + } + + private AbstractExecutable createFlatHiveTableFromFiles(String hiveInitStatements, String jobWorkingDir) { + final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); + final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir, "TEXTFILE"); + + HiveCmdStep step = new HiveCmdStep(); + step.setCmd(hiveInitStatements + dropTableHql + createTableHql); + return step; + } + + private AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, String cubeName) { + KylinConfig config = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig(); + String partCol = flatDesc.getDataModel().getPartitionDesc().getPartitionDateColumn();//tablename.colname + //using sqoop to extract data from jdbc source and dump them to hive + String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[]{partCol}); + String hiveTable = flatDesc.getTableName(); + String connectionUrl = config.getJdbcConnectionUrl(); + String driverClass = config.getJdbcDriver(); + String jdbcUser = config.getJdbcUser(); + String jdbcPass = config.getJdbcPass(); + String sqoopHome = config.getSqoopHome(); + String cmd= String.format(String.format("%s/sqoop import " + + "--connect %s --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" " + + "--target-dir %s/%s --split-by %s", sqoopHome, connectionUrl, driverClass, jdbcUser, + jdbcPass, selectSql, jobWorkingDir, hiveTable, partCol)); + logger.info(String.format("sqoop cmd:%s", cmd)); + CmdStep step = new CmdStep(); + step.setCmd(cmd); + step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); + return step; + } + + @Override + protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) { + // skip + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java new file mode 100644 index 0000000..cd66837 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.jdbc; + +import org.apache.kylin.engine.mr.IMRInput; +import org.apache.kylin.metadata.model.IBuildable; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.IReadableTable; +import org.apache.kylin.source.ISampleDataDeployer; +import org.apache.kylin.source.ISource; +import org.apache.kylin.source.ISourceMetadataExplorer; +import org.apache.kylin.source.SourcePartition; + +//used by reflection +public class JdbcSource implements ISource { + + @Override + public ISourceMetadataExplorer getSourceMetadataExplorer() { + return new JdbcExplorer(); + } + + @SuppressWarnings("unchecked") + @Override + public <I> I adaptToBuildEngine(Class<I> engineInterface) { + if (engineInterface == IMRInput.class) { + return (I) new JdbcHiveMRInput(); + } else { + throw new RuntimeException("Cannot adapt to " + engineInterface); + } + } + + @Override + public IReadableTable createReadableTable(TableDesc tableDesc) { + return new JdbcTable(tableDesc); + } + + @Override + public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) { + SourcePartition result = SourcePartition.getCopyOf(srcPartition); + result.setStartOffset(0); + result.setEndOffset(0); + return result; + } + + @Override + public ISampleDataDeployer getSampleDataDeployer() { + return new JdbcExplorer(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTable.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTable.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTable.java new file mode 100644 index 0000000..4313862 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTable.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.jdbc; + +import java.io.IOException; + + +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.IReadableTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class JdbcTable implements IReadableTable { + + private static final Logger logger = LoggerFactory.getLogger(JdbcTable.class); + + final private String database; + final private String tableName; + + + public JdbcTable(TableDesc tableDesc) { + this.database = tableDesc.getDatabase(); + this.tableName = tableDesc.getName(); + } + + @Override + public TableReader getReader() throws IOException { + return new JdbcTableReader(database, tableName); + } + + @Override + public TableSignature getSignature() throws IOException { + String path = String.format("%s.%s", database, tableName); + long lastModified = System.currentTimeMillis(); // assume table is ever changing + int size=0; + return new TableSignature(path, size, lastModified); + } + + @Override + public boolean exists() { + return true; + } + + @Override + public String toString() { + return "database=[" + database + "], table=[" + tableName + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java new file mode 100644 index 0000000..b8865d6 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.jdbc; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.source.IReadableTable.TableReader; +import org.apache.kylin.source.hive.DBConnConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of TableReader with HCatalog for Hive table. + */ +public class JdbcTableReader implements TableReader { + private static final Logger logger = LoggerFactory.getLogger(JdbcTableReader.class); + + private String dbName; + private String tableName; + + private DBConnConf dbconf; + private String dialect; + private Connection jdbcCon; + private Statement statement; + private ResultSet rs; + private int colCount; + + /** + * Constructor for reading whole hive table + * @param dbName + * @param tableName + * @throws IOException + */ + public JdbcTableReader(String dbName, String tableName) throws IOException { + this.dbName = dbName; + this.tableName = tableName; + KylinConfig config = KylinConfig.getInstanceFromEnv(); + String connectionUrl = config.getJdbcConnectionUrl(); + String driverClass = config.getJdbcDriver(); + String jdbcUser = config.getJdbcUser(); + String jdbcPass = config.getJdbcPass(); + dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass); + this.dialect = config.getJdbcDialect(); + jdbcCon = SqlUtil.getConnection(dbconf); + String sql = String.format("select * from %s.%s", dbName, tableName); + try { + statement = jdbcCon.createStatement(); + rs = statement.executeQuery(sql); + colCount = rs.getMetaData().getColumnCount(); + }catch(SQLException e){ + throw new IOException(String.format("error while exec %s", sql), e); + } + + } + + @Override + public boolean next() throws IOException { + try { + return rs.next(); + } catch (SQLException e) { + throw new IOException(e); + } + } + + @Override + public String[] getRow() { + String[] ret = new String[colCount]; + for (int i=1; i<=colCount; i++){ + try { + Object o = rs.getObject(i); + ret[i-1] = (o == null? null:o.toString()); + }catch(Exception e){ + logger.error("", e); + } + } + return ret; + } + + @Override + public void close() throws IOException { + SqlUtil.closeResources(jdbcCon, statement); + } + + public String toString() { + return "jdbc table reader for: " + dbName + "." + tableName; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java new file mode 100644 index 0000000..a112d87 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.util.Random; + +import javax.sql.DataSource; + +import org.slf4j.LoggerFactory; +import org.apache.kylin.source.hive.DBConnConf; +import org.slf4j.Logger; + +public class SqlUtil { + private static final Logger logger = LoggerFactory.getLogger(SqlUtil.class); + + public static void closeResources(Connection con, Statement statement){ + try{ + if (statement!=null && !statement.isClosed()){ + statement.close(); + } + }catch(Exception e){ + logger.error("", e); + } + + try{ + if (con!=null && !con.isClosed()){ + con.close(); + } + }catch(Exception e){ + logger.error("", e); + } + } + + + public static void execUpdateSQL(String sql, DataSource ds){ + Connection con = null; + try{ + con = ds.getConnection(); + execUpdateSQL(con, sql); + }catch(Exception e){ + logger.error("", e); + }finally{ + closeResources(con, null); + } + } + + public static void execUpdateSQL(Connection db, String sql){ + Statement statement=null; + try{ + statement = db.createStatement(); + statement.executeUpdate(sql); + }catch(Exception e){ + logger.error("", e); + }finally{ + closeResources(null, statement); + } + } + + public static int tryTimes=10; + public static Connection getConnection(DBConnConf dbconf){ + if (dbconf.getUrl()==null) + return null; + Connection con = null; + try { + Class.forName(dbconf.getDriver()); + }catch(Exception e){ + logger.error("", e); + } + boolean got=false; + int times=0; + Random r = new Random(); + while(!got && times<tryTimes){ + times++; + try { + con = DriverManager.getConnection(dbconf.getUrl(), dbconf.getUser(), dbconf.getPass()); + got = true; + }catch(Exception e){ + logger.warn("while use:" + dbconf, e); + try { + int rt = r.nextInt(10); + Thread.sleep(rt*1000); + } catch (InterruptedException e1) { + } + } + } + return con; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index 52d2e6f..7425dae 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -32,8 +32,9 @@ import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.streaming.StreamingConfig; -import org.apache.kylin.source.ISource; import org.apache.kylin.source.IReadableTable; +import org.apache.kylin.source.ISampleDataDeployer; +import org.apache.kylin.source.ISource; import org.apache.kylin.source.ISourceMetadataExplorer; import org.apache.kylin.source.SourcePartition; import org.apache.kylin.source.kafka.config.KafkaConfig; @@ -209,4 +210,9 @@ public class KafkaSource implements ISource { }; } + @Override + public ISampleDataDeployer getSampleDataDeployer() { + throw new UnsupportedOperationException(); + } + }