[ 
https://issues.apache.org/jira/browse/KYLIN-3488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16601963#comment-16601963
 ] 

ASF GitHub Bot commented on KYLIN-3488:
---------------------------------------

shaofengshi closed pull request #216: KYLIN-3488 Support MySQL as Kylin 
metadata storage
URL: https://github.com/apache/kylin/pull/216
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core-common/pom.xml b/core-common/pom.xml
index 6b48d656e0..2f1c8e21e5 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -91,5 +91,44 @@
             
<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
             <version>0.1.2</version>
         </dependency>
+        <dependency>
+            <groupId>commons-dbcp</groupId>
+            <artifactId>commons-dbcp</artifactId>
+            <version>1.4</version>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy</goal>
+                        </goals>
+                        <configuration>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>mysql</groupId>
+                                    
<artifactId>mysql-connector-java</artifactId>
+                                    <overWrite>true</overWrite>
+                                    
<outputDirectory>${project.basedir}/../../build/ext</outputDirectory>
+                                </artifactItem>
+                            </artifactItems>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>true</overWriteSnapshots>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index e09ce26149..468bef7f04 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -534,4 +534,26 @@ public boolean equals(Object another) {
             return this.base() == ((KylinConfig) another).base();
     }
 
+    public String getMetadataDialect() {
+        return SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.dialect", 
"mysql");
+    }
+
+    public boolean isJsonAlwaysSmallCell() {
+        return 
Boolean.valueOf(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.json-always-small-cell",
 "true"));
+    }
+
+    public int getSmallCellMetadataWarningThreshold() {
+        return 
Integer.parseInt(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold",
+                String.valueOf(100 << 20)));
+    }
+
+    public int getSmallCellMetadataErrorThreshold() {
+        return Integer.parseInt(
+                
SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold",
 String.valueOf(1 << 30)));
+    }
+
+    public int getJdbcResourceStoreMaxCellSize() {
+        return 
Integer.parseInt(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.max-cell-size",
 "262144")); //256k
+    }
+
 }
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4895bf0745..b8d87bc349 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -226,6 +226,7 @@ public String getDeployEnv() {
     }
 
     private String cachedHdfsWorkingDirectory;
+    private String cachedBigCellDirectory;
 
     public String getHdfsWorkingDirectory() {
         if (cachedHdfsWorkingDirectory != null)
@@ -260,6 +261,94 @@ public String getHdfsWorkingDirectory() {
         return cachedHdfsWorkingDirectory;
     }
 
+    public String getMetastoreBigCellHdfsDirectory() {
+
+        if (cachedBigCellDirectory != null)
+            return cachedBigCellDirectory;
+
+
+        String root = getOptional("kylin.env.hdfs-metastore-bigcell-dir");
+
+        if (root == null) {
+            return getJdbcHdfsWorkingDirectory();
+        }
+
+        Path path = new Path(root);
+        if (!path.isAbsolute())
+            throw new IllegalArgumentException(
+                    "kylin.env.hdfs-metastore-bigcell-dir must be absolute, 
but got " + root);
+
+        // make sure path is qualified
+        try {
+            FileSystem fs = HadoopUtil.getReadFileSystem();
+            path = fs.makeQualified(path);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        root = new Path(path, StringUtils.replaceChars(getMetadataUrlPrefix(), 
':', '-')).toString();
+
+        if (!root.endsWith("/"))
+            root += "/";
+
+        cachedBigCellDirectory = root;
+        if (cachedBigCellDirectory.startsWith("file:")) {
+            cachedBigCellDirectory = cachedBigCellDirectory.replace("file:", 
"file://");
+        } else if (cachedBigCellDirectory.startsWith("maprfs:")) {
+            cachedBigCellDirectory = cachedBigCellDirectory.replace("maprfs:", 
"maprfs://");
+        }
+
+        return cachedBigCellDirectory;
+    }
+
+    private String getJdbcHdfsWorkingDirectory() {
+        if (StringUtils.isNotEmpty(getJdbcFileSystem())) {
+            Path workingDir = new Path(getReadHdfsWorkingDirectory());
+            return new Path(getJdbcFileSystem(), 
Path.getPathWithoutSchemeAndAuthority(workingDir)).toString() + "/";
+        }
+
+        return getReadHdfsWorkingDirectory();
+    }
+
+    /**
+     * Consider use kylin.env.hdfs-metastore-bigcell-dir instead of 
kylin.storage.columnar.jdbc.file-system
+     */
+    private String getJdbcFileSystem() {
+        return getOptional("kylin.storage.columnar.jdbc.file-system", "");
+    }
+
+    public String getHdfsWorkingDirectory(String project) {
+        if (isProjectIsolationEnabled() && project != null) {
+            return new Path(getHdfsWorkingDirectory(), project).toString() + 
"/";
+        } else {
+            return getHdfsWorkingDirectory();
+        }
+    }
+
+    private String getReadHdfsWorkingDirectory() {
+        if (StringUtils.isNotEmpty(getParquetReadFileSystem())) {
+            Path workingDir = new Path(getHdfsWorkingDirectory());
+            return new Path(getParquetReadFileSystem(), 
Path.getPathWithoutSchemeAndAuthority(workingDir)).toString()
+                    + "/";
+        }
+
+        return getHdfsWorkingDirectory();
+    }
+
+    public String getReadHdfsWorkingDirectory(String project) {
+        if (StringUtils.isNotEmpty(getParquetReadFileSystem())) {
+            Path workingDir = new Path(getHdfsWorkingDirectory(project));
+            return new Path(getParquetReadFileSystem(), 
Path.getPathWithoutSchemeAndAuthority(workingDir)).toString()
+                    + "/";
+        }
+
+        return getHdfsWorkingDirectory(project);
+    }
+
+    public String getParquetReadFileSystem() {
+        return getOptional("kylin.storage.columnar.file-system", "");
+    }
+
     public String getZookeeperBasePath() {
         return getOptional("kylin.env.zookeeper-base-path", "/kylin");
     }
@@ -323,6 +412,7 @@ public String getMetadataUrlPrefix() {
         r.put("hbase", "org.apache.kylin.storage.hbase.HBaseResourceStore");
         r.put("hdfs", "org.apache.kylin.common.persistence.HDFSResourceStore");
         r.put("ifile", 
"org.apache.kylin.common.persistence.IdentifierFileResourceStore");
+        r.put("jdbc", "org.apache.kylin.common.persistence.JDBCResourceStore");
         
r.putAll(getPropertiesByPrefix("kylin.metadata.resource-store-provider.")); // 
note the naming convention -- 
http://kylin.apache.org/development/coding_naming_convention.html
         return r;
     }
@@ -1309,6 +1399,10 @@ public boolean isStreamAggregateEnabled() {
         return 
Boolean.parseBoolean(getOptional("kylin.query.stream-aggregate-enabled", 
"true"));
     }
 
+    public boolean isProjectIsolationEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.storage.project-isolation-enable", 
"true"));
+    }
+
     @Deprecated //Limit is good even it's large. This config is meaning less 
since we already have scan threshold
     public int getStoragePushDownLimitMax() {
         return Integer.parseInt(getOptional("kylin.query.max-limit-pushdown", 
"10000"));
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenEntity.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenEntity.java
new file mode 100644
index 0000000000..6e1b4c28e6
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenEntity.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class BrokenEntity extends RootPersistentEntity {
+
+    public static final byte[] MAGIC = new byte[]{'B', 'R', 'O', 'K', 'E', 
'N'};
+
+    @JsonProperty("resPath")
+    private String resPath;
+
+    @JsonProperty("errorMsg")
+    private String errorMsg;
+
+    public BrokenEntity() {
+    }
+
+    public BrokenEntity(String resPath, String errorMsg) {
+        this.resPath = resPath;
+        this.errorMsg = errorMsg;
+    }
+
+    public String getResPath() {
+        return resPath;
+    }
+
+    public void setResPath(String resPath) {
+        this.resPath = resPath;
+    }
+
+    public String getErrorMsg() {
+        return errorMsg;
+    }
+
+    public void setErrorMsg(String errorMsg) {
+        this.errorMsg = errorMsg;
+    }
+}
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenInputStream.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenInputStream.java
new file mode 100644
index 0000000000..9eddba8d43
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenInputStream.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class BrokenInputStream extends InputStream {
+    private static Logger logger = 
LoggerFactory.getLogger(BrokenInputStream.class);
+    private final ByteArrayInputStream in;
+
+    public BrokenInputStream(BrokenEntity brokenEntity) {
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        try {
+            IOUtils.write(BrokenEntity.MAGIC, out);
+            IOUtils.write(JsonUtil.writeValueAsBytes(brokenEntity), out);
+        } catch (IOException e) {
+            logger.error("There is something error when we serialize 
BrokenEntity: ", e);
+            throw new RuntimeException("There is something error when we 
serialize BrokenEntity.");
+        }
+
+        in = new ByteArrayInputStream(out.toByteArray());
+    }
+
+    @Override
+    public int read() {
+        return in.read();
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+        super.close();
+    }
+}
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java
new file mode 100644
index 0000000000..753601aae0
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.dbcp.BasicDataSourceFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.util.EncryptUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class JDBCConnectionManager {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(JDBCConnectionManager.class);
+
+    private static JDBCConnectionManager INSTANCE = null;
+
+    private static Object lock = new Object();
+
+    public static JDBCConnectionManager getConnectionManager() {
+        if (INSTANCE == null) {
+            synchronized (lock) {
+                if (INSTANCE == null) {
+                    INSTANCE = new 
JDBCConnectionManager(KylinConfig.getInstanceFromEnv());
+                }
+            }
+        }
+        return INSTANCE;
+    }
+
+    // 
============================================================================
+
+    private final Map<String, String> dbcpProps;
+    private final DataSource dataSource;
+
+    private JDBCConnectionManager(KylinConfig config) {
+        try {
+            this.dbcpProps = initDbcpProps(config);
+
+            dataSource = 
BasicDataSourceFactory.createDataSource(getDbcpProperties());
+            Connection conn = getConn();
+            DatabaseMetaData mdm = conn.getMetaData();
+            logger.info("Connected to " + mdm.getDatabaseProductName() + " " + 
mdm.getDatabaseProductVersion());
+            closeQuietly(conn);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Map<String, String> initDbcpProps(KylinConfig config) {
+        // metadataUrl is like 
"kylin_default_instance@jdbc,url=jdbc:mysql://localhost:3306/kylin,username=root,password=xxx"
+        StorageURL metadataUrl = config.getMetadataUrl();
+        JDBCResourceStore.checkScheme(metadataUrl);
+
+        LinkedHashMap<String, String> ret = new 
LinkedHashMap<>(metadataUrl.getAllParameters());
+        List<String> mandatoryItems = Arrays.asList("url", "username", 
"password");
+
+        for (String item : mandatoryItems) {
+            Preconditions.checkNotNull(ret.get(item),
+                    "Setting item \"" + item + "\" is mandatory for Jdbc 
connections.");
+        }
+
+        // Check whether password encrypted
+        if ("true".equals(ret.get("passwordEncrypted"))) {
+            String password = ret.get("password");
+            ret.put("password", EncryptUtil.decrypt(password));
+            ret.remove("passwordEncrypted");
+        }
+
+        logger.info("Connecting to Jdbc with url:" + ret.get("url") + " by 
user " + ret.get("username"));
+
+        putIfMissing(ret, "driverClassName", "com.mysql.jdbc.Driver");
+        putIfMissing(ret, "maxActive", "5");
+        putIfMissing(ret, "maxIdle", "5");
+        putIfMissing(ret, "maxWait", "1000");
+        putIfMissing(ret, "removeAbandoned", "true");
+        putIfMissing(ret, "removeAbandonedTimeout", "180");
+        putIfMissing(ret, "testOnBorrow", "true");
+        putIfMissing(ret, "testWhileIdle", "true");
+        putIfMissing(ret, "validationQuery", "select 1");
+        return ret;
+    }
+
+    private void putIfMissing(LinkedHashMap<String, String> map, String key, 
String value) {
+        if (map.containsKey(key) == false)
+            map.put(key, value);
+    }
+
+    public final Connection getConn() throws SQLException {
+        return dataSource.getConnection();
+    }
+
+    public Properties getDbcpProperties() {
+        Properties ret = new Properties();
+        ret.putAll(dbcpProps);
+        return ret;
+    }
+
+    public static void closeQuietly(AutoCloseable obj) {
+        if (obj != null) {
+            try {
+                obj.close();
+            } catch (Exception e) {
+                logger.warn("Error closing " + obj, e);
+            }
+        }
+    }
+
+    public void close() {
+        try {
+            ((org.apache.commons.dbcp.BasicDataSource) dataSource).close();
+        } catch (SQLException e) {
+            logger.error("error closing data source", e);
+        }
+    }
+}
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResource.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResource.java
new file mode 100644
index 0000000000..f9c300072e
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResource.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kylin.common.persistence;
+
+import java.io.InputStream;
+
+public class JDBCResource {
+    private String path;
+
+    private long timestamp;
+
+    private InputStream content;
+
+    public JDBCResource() {
+
+    }
+
+    public JDBCResource(String path, long timestamp, InputStream content) {
+        this.path = path;
+        this.timestamp = timestamp;
+        this.content = content;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    public void setPath(String path) {
+        this.path = path;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public InputStream getContent() {
+        return content;
+    }
+
+    public void setContent(InputStream content) {
+        this.content = content;
+    }
+}
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
new file mode 100644
index 0000000000..f267530fea
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.TreeSet;
+
+public class JDBCResourceDAO {
+
+    private static Logger logger = 
LoggerFactory.getLogger(JDBCResourceDAO.class);
+
+    private static final String META_TABLE_KEY = "META_TABLE_KEY";
+
+    private static final String META_TABLE_TS = "META_TABLE_TS";
+
+    private static final String META_TABLE_CONTENT = "META_TABLE_CONTENT";
+
+    private JDBCConnectionManager connectionManager;
+
+    private JDBCSqlQueryFormat jdbcSqlQueryFormat;
+
+    private String[] tablesName;
+
+    private KylinConfig kylinConfig;
+
+    // For test
+    private long queriedSqlNum = 0;
+
+    public JDBCResourceDAO(KylinConfig kylinConfig, String[] tablesName) 
throws SQLException {
+        this.kylinConfig = kylinConfig;
+        this.connectionManager = JDBCConnectionManager.getConnectionManager();
+        this.jdbcSqlQueryFormat = 
JDBCSqlQueryFormatProvider.createJDBCSqlQueriesFormat(kylinConfig.getMetadataDialect());
+        this.tablesName = tablesName;
+        for (int i = 0; i < tablesName.length; i++) {
+            createTableIfNeeded(tablesName[i]);
+            createIndex("IDX_" + META_TABLE_TS, tablesName[i], META_TABLE_TS);
+        }
+    }
+
+    public void close() {
+        connectionManager.close();
+    }
+
+    public JDBCResource getResource(final String resourcePath, final boolean 
fetchContent, final boolean fetchTimestamp)
+            throws SQLException {
+        return getResource(resourcePath, fetchContent, fetchTimestamp, false);
+    }
+
+    public JDBCResource getResource(final String resourcePath, final boolean 
fetchContent, final boolean fetchTimestamp,
+                                    final boolean isAllowBroken) throws 
SQLException {
+        final JDBCResource resource = new JDBCResource();
+        logger.trace("getResource method. resourcePath : {} , fetchConetent : 
{} , fetch TS : {}", resourcePath,
+                fetchContent, fetchTimestamp);
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                String tableName = getMetaTableName(resourcePath);
+                pstat = 
connection.prepareStatement(getKeyEqualSqlString(tableName, fetchContent, 
fetchTimestamp));
+                pstat.setString(1, resourcePath);
+                rs = pstat.executeQuery();
+                if (rs.next()) {
+                    resource.setPath(rs.getString(META_TABLE_KEY));
+                    if (fetchTimestamp)
+                        resource.setTimestamp(rs.getLong(META_TABLE_TS));
+                    if (fetchContent) {
+                        try {
+                            resource.setContent(getInputStream(resourcePath, 
rs));
+                        } catch (Throwable e) {
+                            if (!isAllowBroken) {
+                                throw new SQLException(e);
+                            }
+
+                            final BrokenEntity brokenEntity = new 
BrokenEntity(resourcePath, e.getMessage());
+                            resource.setContent(new 
BrokenInputStream(brokenEntity));
+                            logger.warn(e.getMessage());
+                        }
+                    }
+                }
+            }
+        });
+        if (resource.getPath() != null) {
+            return resource;
+        } else {
+            return null;
+        }
+    }
+
+    public boolean existResource(final String resourcePath) throws 
SQLException {
+        JDBCResource resource = getResource(resourcePath, false, false);
+        return (resource != null);
+    }
+
+    public long getResourceTimestamp(final String resourcePath) throws 
SQLException {
+        JDBCResource resource = getResource(resourcePath, false, true);
+        return resource == null ? 0 : resource.getTimestamp();
+    }
+
+    //fetch primary key only
+    public TreeSet<String> listAllResource(final String folderPath, final 
boolean recursive) throws SQLException {
+        final TreeSet<String> allResourceName = new TreeSet<>();
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                String tableName = getMetaTableName(folderPath);
+                pstat = 
connection.prepareStatement(getListResourceSqlString(tableName));
+                pstat.setString(1, folderPath + "%");
+                rs = pstat.executeQuery();
+                while (rs.next()) {
+                    String path = rs.getString(META_TABLE_KEY);
+                    assert path.startsWith(folderPath);
+                    if (recursive) {
+                        allResourceName.add(path);
+                    } else {
+                        int cut = path.indexOf('/', folderPath.length());
+                        String child = cut < 0 ? path : path.substring(0, cut);
+                        allResourceName.add(child);
+                    }
+                }
+            }
+        });
+        return allResourceName;
+    }
+
+    public List<JDBCResource> getAllResource(final String folderPath, final 
long timeStart, final long timeEndExclusive,
+                                             final boolean isAllowBroken) 
throws SQLException {
+        final List<JDBCResource> allResource = Lists.newArrayList();
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                String tableName = getMetaTableName(folderPath);
+                pstat = 
connection.prepareStatement(getAllResourceSqlString(tableName));
+                pstat.setString(1, folderPath + "%");
+                pstat.setLong(2, timeStart);
+                pstat.setLong(3, timeEndExclusive);
+                rs = pstat.executeQuery();
+                while (rs.next()) {
+                    String resPath = rs.getString(META_TABLE_KEY);
+                    if (checkPath(folderPath, resPath)) {
+                        JDBCResource resource = new JDBCResource();
+                        resource.setPath(resPath);
+                        resource.setTimestamp(rs.getLong(META_TABLE_TS));
+                        try {
+                            resource.setContent(getInputStream(resPath, rs));
+                        } catch (Throwable e) {
+                            if (!isAllowBroken) {
+                                throw new SQLException(e);
+                            }
+
+                            final BrokenEntity brokenEntity = new 
BrokenEntity(resPath, e.getMessage());
+                            resource.setContent(new 
BrokenInputStream(brokenEntity));
+                            logger.warn(e.getMessage());
+                        }
+                        allResource.add(resource);
+                    }
+                }
+            }
+        });
+        return allResource;
+    }
+
+    private boolean checkPath(String lookForPrefix, String resPath) {
+        lookForPrefix = lookForPrefix.endsWith("/") ? lookForPrefix : 
lookForPrefix + "/";
+        assert resPath.startsWith(lookForPrefix);
+        int cut = resPath.indexOf('/', lookForPrefix.length());
+        return (cut < 0);
+    }
+
+    private boolean isJsonMetadata(String resourcePath) {
+        String trim = resourcePath.trim();
+        return trim.endsWith(".json") || 
trim.startsWith(ResourceStore.EXECUTE_RESOURCE_ROOT)
+                || trim.startsWith(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
+
+    }
+
+    public void deleteResource(final String resourcePath) throws SQLException {
+
+        boolean skipHdfs = isJsonMetadata(resourcePath);
+
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                String tableName = getMetaTableName(resourcePath);
+                pstat = 
connection.prepareStatement(getDeletePstatSql(tableName));
+                pstat.setString(1, resourcePath);
+                pstat.executeUpdate();
+            }
+        });
+
+        if (!skipHdfs) {
+            try {
+                deleteHDFSResourceIfExist(resourcePath);
+            } catch (Throwable e) {
+                throw new SQLException(e);
+            }
+        }
+    }
+
+    private void deleteHDFSResourceIfExist(String resourcePath) throws 
IOException {
+
+        Path redirectPath = bigCellHDFSPath(resourcePath);
+        FileSystem fileSystem = HadoopUtil.getFileSystem(redirectPath);
+
+        if (fileSystem.exists(redirectPath)) {
+            fileSystem.delete(redirectPath, true);
+        }
+
+    }
+
+    public void putResource(final JDBCResource resource) throws SQLException {
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                byte[] content = getResourceDataBytes(resource);
+                synchronized (resource.getPath().intern()) {
+                    boolean existing = existResource(resource.getPath());
+                    String tableName = getMetaTableName(resource.getPath());
+                    if (existing) {
+                        pstat = 
connection.prepareStatement(getReplaceSql(tableName));
+                        pstat.setLong(1, resource.getTimestamp());
+                        pstat.setBlob(2, new BufferedInputStream(new 
ByteArrayInputStream(content)));
+                        pstat.setString(3, resource.getPath());
+                    } else {
+                        pstat = 
connection.prepareStatement(getInsertSql(tableName));
+                        pstat.setString(1, resource.getPath());
+                        pstat.setLong(2, resource.getTimestamp());
+                        pstat.setBlob(3, new BufferedInputStream(new 
ByteArrayInputStream(content)));
+                    }
+
+                    if (isContentOverflow(content, resource.getPath())) {
+                        logger.debug("Overflow! resource path: {}, content 
size: {}, timeStamp: {}", resource.getPath(),
+                                content.length, resource.getTimestamp());
+                        if (existing) {
+                            pstat.setNull(2, Types.BLOB);
+                        } else {
+                            pstat.setNull(3, Types.BLOB);
+                        }
+                        writeLargeCellToHdfs(resource.getPath(), content);
+                        try {
+                            int result = pstat.executeUpdate();
+                            if (result != 1)
+                                throw new SQLException();
+                        } catch (SQLException e) {
+                            rollbackLargeCellFromHdfs(resource.getPath());
+                            throw e;
+                        }
+                        if (existing) {
+                            cleanOldLargeCellFromHdfs(resource.getPath());
+                        }
+                    } else {
+                        pstat.executeUpdate();
+                    }
+                }
+            }
+        });
+    }
+
+    public void checkAndPutResource(final String resPath, final byte[] 
content, final long oldTS, final long newTS)
+            throws SQLException, WriteConflictException {
+        logger.trace(
+                "execute checkAndPutResource method. resPath : {} , oldTs : {} 
, newTs : {} , content null ? : {} ",
+                resPath, oldTS, newTS, content == null);
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                synchronized (resPath.intern()) {
+                    String tableName = getMetaTableName(resPath);
+                    if (!existResource(resPath)) {
+                        if (oldTS != 0) {
+                            throw new IllegalStateException(
+                                    "For not exist file. OldTS have to be 0. 
but Actual oldTS is : " + oldTS);
+                        }
+                        if (isContentOverflow(content, resPath)) {
+                            logger.debug("Overflow! resource path: {}, content 
size: {}", resPath, content.length);
+                            pstat = 
connection.prepareStatement(getInsertSqlWithoutContent(tableName));
+                            pstat.setString(1, resPath);
+                            pstat.setLong(2, newTS);
+                            writeLargeCellToHdfs(resPath, content);
+                            try {
+                                int result = pstat.executeUpdate();
+                                if (result != 1)
+                                    throw new SQLException();
+                            } catch (SQLException e) {
+                                rollbackLargeCellFromHdfs(resPath);
+                                throw e;
+                            }
+                        } else {
+                            pstat = 
connection.prepareStatement(getInsertSql(tableName));
+                            pstat.setString(1, resPath);
+                            pstat.setLong(2, newTS);
+                            pstat.setBlob(3, new BufferedInputStream(new 
ByteArrayInputStream(content)));
+                            pstat.executeUpdate();
+                        }
+                    } else {
+                        // Note the checkAndPut trick:
+                        // update {0} set {1}=? where {2}=? and {3}=?
+                        pstat = 
connection.prepareStatement(getUpdateSqlWithoutContent(tableName));
+                        pstat.setLong(1, newTS);
+                        pstat.setString(2, resPath);
+                        pstat.setLong(3, oldTS);
+                        int result = pstat.executeUpdate();
+                        if (result != 1) {
+                            long realTime = getResourceTimestamp(resPath);
+                            throw new WriteConflictException("Overwriting 
conflict " + resPath + ", expect old TS "
+                                    + oldTS + ", but it is " + realTime);
+                        }
+                        PreparedStatement pstat2 = null;
+                        try {
+                            // "update {0} set {1}=? where {3}=?"
+                            pstat2 = 
connection.prepareStatement(getUpdateContentSql(tableName));
+                            if (isContentOverflow(content, resPath)) {
+                                logger.debug("Overflow! resource path: {}, 
content size: {}", resPath, content.length);
+                                pstat2.setNull(1, Types.BLOB);
+                                pstat2.setString(2, resPath);
+                                writeLargeCellToHdfs(resPath, content);
+                                try {
+                                    int result2 = pstat2.executeUpdate();
+                                    if (result2 != 1)
+                                        throw new SQLException();
+                                } catch (SQLException e) {
+                                    rollbackLargeCellFromHdfs(resPath);
+                                    throw e;
+                                }
+                                cleanOldLargeCellFromHdfs(resPath);
+                            } else {
+                                pstat2.setBinaryStream(1, new 
BufferedInputStream(new ByteArrayInputStream(content)));
+                                pstat2.setString(2, resPath);
+                                pstat2.executeUpdate();
+                            }
+                        } finally {
+                            JDBCConnectionManager.closeQuietly(pstat2);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    private byte[] getResourceDataBytes(JDBCResource resource) throws 
SQLException {
+        ByteArrayOutputStream bout = null;
+        try {
+            bout = new ByteArrayOutputStream();
+            IOUtils.copy(resource.getContent(), bout);
+            return bout.toByteArray();
+        } catch (Throwable e) {
+            throw new SQLException(e);
+        } finally {
+            IOUtils.closeQuietly(bout);
+        }
+    }
+
+    private boolean isContentOverflow(byte[] content, String resPath) throws 
SQLException {
+        if (kylinConfig.isJsonAlwaysSmallCell() && isJsonMetadata(resPath)) {
+
+            int smallCellMetadataWarningThreshold = 
kylinConfig.getSmallCellMetadataWarningThreshold();
+            int smallCellMetadataErrorThreshold = 
kylinConfig.getSmallCellMetadataErrorThreshold();
+
+            if (content.length > smallCellMetadataWarningThreshold) {
+                logger.warn(
+                        "A JSON metadata entry's size is not supposed to 
exceed kylin.metadata.jdbc.small-cell-meta-size-warning-threshold("
+                                + smallCellMetadataWarningThreshold + "), 
resPath: " + resPath + ", actual size: "
+                                + content.length);
+            }
+            if (content.length > smallCellMetadataErrorThreshold) {
+                throw new SQLException(new IllegalArgumentException(
+                        "A JSON metadata entry's size is not supposed to 
exceed kylin.metadata.jdbc.small-cell-meta-size-error-threshold("
+                                + smallCellMetadataErrorThreshold + "), 
resPath: " + resPath + ", actual size: "
+                                + content.length));
+            }
+
+            return false;
+        }
+
+        int maxSize = kylinConfig.getJdbcResourceStoreMaxCellSize();
+        if (content.length > maxSize)
+            return true;
+        else
+            return false;
+    }
+
+    private void createTableIfNeeded(final String tableName) throws 
SQLException {
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                if (checkTableExists(tableName, connection)) {
+                    logger.info("Table [{}] already exists", tableName);
+                    return;
+                }
+
+                pstat = 
connection.prepareStatement(getCreateIfNeededSql(tableName));
+                pstat.executeUpdate();
+                logger.info("Create table [{}] success", tableName);
+            }
+
+            private boolean checkTableExists(final String tableName, final 
Connection connection) throws SQLException {
+                final PreparedStatement ps = 
connection.prepareStatement(getCheckTableExistsSql(tableName));
+                final ResultSet rs = ps.executeQuery();
+                while (rs.next()) {
+                    if (tableName.equals(rs.getString(1))) {
+                        return true;
+                    }
+                }
+
+                return false;
+            }
+        });
+    }
+
+    private void createIndex(final String indexName, final String tableName, 
final String colName) {
+        try {
+            executeSql(new SqlOperation() {
+                @Override
+                public void execute(Connection connection) throws SQLException 
{
+                    pstat = 
connection.prepareStatement(getCreateIndexSql(indexName, tableName, colName));
+                    pstat.executeUpdate();
+                }
+            });
+        } catch (SQLException ex) {
+            logger.info("Create index failed with message: " + 
ex.getLocalizedMessage());
+        }
+    }
+
+    abstract static class SqlOperation {
+        PreparedStatement pstat = null;
+        ResultSet rs = null;
+
+        abstract public void execute(final Connection connection) throws 
SQLException;
+    }
+
+    private void executeSql(SqlOperation operation) throws SQLException {
+        Connection connection = null;
+        try {
+            connection = connectionManager.getConn();
+            operation.execute(connection);
+            queriedSqlNum++;
+        } finally {
+            JDBCConnectionManager.closeQuietly(operation.rs);
+            JDBCConnectionManager.closeQuietly(operation.pstat);
+            JDBCConnectionManager.closeQuietly(connection);
+        }
+    }
+
+    private String getCheckTableExistsSql(final String tableName) {
+        final String sql = 
MessageFormat.format(jdbcSqlQueryFormat.getCheckTableExistsSql(), tableName);
+        return sql;
+    }
+
+    //sql queries
+    private String getCreateIfNeededSql(String tableName) {
+        String sql = 
MessageFormat.format(jdbcSqlQueryFormat.getCreateIfNeedSql(), tableName, 
META_TABLE_KEY,
+                META_TABLE_TS, META_TABLE_CONTENT);
+        return sql;
+    }
+
+    //sql queries
+    private String getCreateIndexSql(String indexName, String tableName, 
String indexCol) {
+        String sql = 
MessageFormat.format(jdbcSqlQueryFormat.getCreateIndexSql(), indexName, 
tableName, indexCol);
+        return sql;
+    }
+
+    private String getKeyEqualSqlString(String tableName, boolean 
fetchContent, boolean fetchTimestamp) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getKeyEqualsSql(),
+                getSelectList(fetchContent, fetchTimestamp), tableName, 
META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getDeletePstatSql(String tableName) {
+        String sql = 
MessageFormat.format(jdbcSqlQueryFormat.getDeletePstatSql(), tableName, 
META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getListResourceSqlString(String tableName) {
+        String sql = 
MessageFormat.format(jdbcSqlQueryFormat.getListResourceSql(), META_TABLE_KEY, 
tableName,
+                META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getAllResourceSqlString(String tableName) {
+        String sql = 
MessageFormat.format(jdbcSqlQueryFormat.getAllResourceSql(), 
getSelectList(true, true), tableName,
+                META_TABLE_KEY, META_TABLE_TS, META_TABLE_TS);
+        return sql;
+    }
+
+    private String getReplaceSql(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getReplaceSql(), 
tableName, META_TABLE_TS,
+                META_TABLE_CONTENT, META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getInsertSql(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getInsertSql(), 
tableName, META_TABLE_KEY, META_TABLE_TS,
+                META_TABLE_CONTENT);
+        return sql;
+    }
+
+    @SuppressWarnings("unused")
+    private String getReplaceSqlWithoutContent(String tableName) {
+        String sql = 
MessageFormat.format(jdbcSqlQueryFormat.getReplaceSqlWithoutContent(), 
tableName, META_TABLE_TS,
+                META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getInsertSqlWithoutContent(String tableName) {
+        String sql = 
MessageFormat.format(jdbcSqlQueryFormat.getInsertSqlWithoutContent(), 
tableName, META_TABLE_KEY,
+                META_TABLE_TS);
+        return sql;
+    }
+
+    private String getUpdateSqlWithoutContent(String tableName) {
+        String sql = 
MessageFormat.format(jdbcSqlQueryFormat.getUpdateSqlWithoutContent(), 
tableName, META_TABLE_TS,
+                META_TABLE_KEY, META_TABLE_TS);
+        return sql;
+    }
+
+    private String getUpdateContentSql(String tableName) {
+        String sql = 
MessageFormat.format(jdbcSqlQueryFormat.getUpdateContentSql(), tableName, 
META_TABLE_CONTENT,
+                META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getSelectList(boolean fetchContent, boolean fetchTimestamp) 
{
+        StringBuilder sb = new StringBuilder();
+        sb.append(META_TABLE_KEY);
+        if (fetchTimestamp)
+            sb.append("," + META_TABLE_TS);
+        if (fetchContent)
+            sb.append("," + META_TABLE_CONTENT);
+        return sb.toString();
+    }
+
+    private InputStream getInputStream(String resPath, ResultSet rs) throws 
SQLException, IOException {
+        if (rs == null) {
+            return null;
+        }
+        InputStream inputStream = rs.getBlob(META_TABLE_CONTENT) == null ? null
+                : rs.getBlob(META_TABLE_CONTENT).getBinaryStream();
+        if (inputStream != null) {
+            return inputStream;
+        } else {
+            Path redirectPath = bigCellHDFSPath(resPath);
+            FileSystem fileSystem = HadoopUtil.getFileSystem(redirectPath);
+            return fileSystem.open(redirectPath);
+        }
+    }
+
+    private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn) 
throws SQLException {
+
+        boolean isResourceExist;
+        FSDataOutputStream out = null;
+        Path redirectPath = bigCellHDFSPath(resPath);
+        Path oldPath = new Path(redirectPath.toString() + "_old");
+        FileSystem fileSystem = null;
+        try {
+            fileSystem = HadoopUtil.getFileSystem(redirectPath);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        try {
+            isResourceExist = fileSystem.exists(redirectPath);
+            if (isResourceExist) {
+                FileUtil.copy(fileSystem, redirectPath, fileSystem, oldPath, 
false,
+                        HadoopUtil.getCurrentConfiguration());
+                fileSystem.delete(redirectPath, true);
+                logger.debug("a copy of hdfs file {} is made", redirectPath);
+            }
+            out = fileSystem.create(redirectPath);
+            out.write(largeColumn);
+            return redirectPath;
+        } catch (Throwable e) {
+            try {
+                rollbackLargeCellFromHdfs(resPath);
+            } catch (Throwable ex) {
+                logger.error("fail to roll back resource " + resPath + " in 
hdfs", ex);
+            }
+            throw new SQLException(e);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+
+    public void rollbackLargeCellFromHdfs(String resPath) throws SQLException {
+        Path redirectPath = bigCellHDFSPath(resPath);
+        Path oldPath = new Path(redirectPath.toString() + "_old");
+        FileSystem fileSystem = null;
+        try {
+            fileSystem = HadoopUtil.getFileSystem(redirectPath);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        try {
+            if (fileSystem.exists(oldPath)) {
+                FileUtil.copy(fileSystem, oldPath, fileSystem, redirectPath, 
true, true,
+                        HadoopUtil.getCurrentConfiguration());
+                logger.info("roll back hdfs file {}", resPath);
+            } else {
+                fileSystem.delete(redirectPath, true);
+                logger.warn("no backup for hdfs file {} is found, clean it", 
resPath);
+            }
+        } catch (Throwable e) {
+
+            try {
+                //last try to delete redirectPath, because we prefer a deleted 
rather than incomplete
+                fileSystem.delete(redirectPath, true);
+            } catch (Throwable ignore) {
+                // ignore it
+            }
+
+            throw new SQLException(e);
+        }
+    }
+
+    private void cleanOldLargeCellFromHdfs(String resPath) throws SQLException 
{
+        Path redirectPath = bigCellHDFSPath(resPath);
+        Path oldPath = new Path(redirectPath.toString() + "_old");
+        FileSystem fileSystem = null;
+        try {
+            fileSystem = HadoopUtil.getFileSystem(redirectPath);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        try {
+            if (fileSystem.exists(oldPath)) {
+                fileSystem.delete(oldPath, true);
+            }
+        } catch (Throwable e) {
+            logger.warn("error cleaning the backup file for " + redirectPath + 
", leave it as garbage", e);
+        }
+    }
+
+    public Path bigCellHDFSPath(String resPath) {
+        String metastoreBigCellHdfsDirectory = 
this.kylinConfig.getMetastoreBigCellHdfsDirectory();
+        Path redirectPath = new Path(metastoreBigCellHdfsDirectory, 
"resources-jdbc" + resPath);
+        return redirectPath;
+    }
+
+    public long getQueriedSqlNum() {
+        return queriedSqlNum;
+    }
+
+    public String getMetaTableName(String resPath) {
+        if (resPath.startsWith(ResourceStore.BAD_QUERY_RESOURCE_ROOT) || 
resPath.startsWith(ResourceStore.CUBE_STATISTICS_ROOT)
+                || resPath.startsWith(ResourceStore.DICT_RESOURCE_ROOT) || 
resPath.startsWith(ResourceStore.EXECUTE_RESOURCE_ROOT)
+                || 
resPath.startsWith(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT) || 
resPath.startsWith(ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT)
+                || 
resPath.startsWith(ResourceStore.TEMP_STATMENT_RESOURCE_ROOT)) {
+            return tablesName[1];
+        } else {
+            return tablesName[0];
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
new file mode 100644
index 0000000000..b62b33fa7f
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+public class JDBCResourceStore extends ResourceStore {
+
+    private static final String JDBC_SCHEME = "jdbc";
+
+    private String[] tablesName = new String[2];
+
+    private JDBCResourceDAO resourceDAO;
+
+    public JDBCResourceStore(KylinConfig kylinConfig) throws SQLException {
+        super(kylinConfig);
+        StorageURL metadataUrl = kylinConfig.getMetadataUrl();
+        checkScheme(metadataUrl);
+        tablesName[0] = metadataUrl.getIdentifier();
+        tablesName[1] = metadataUrl.getIdentifier() + "1";
+        this.resourceDAO = new JDBCResourceDAO(kylinConfig, tablesName);
+    }
+
+    @Override
+    protected boolean existsImpl(String resPath) throws IOException {
+        try {
+            return resourceDAO.existResource(resPath);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected RawResource getResourceImpl(String resPath) throws IOException {
+        return getResourceImpl(resPath, false);
+    }
+
+    protected RawResource getResourceImpl(String resPath, final boolean 
isAllowBroken) throws IOException {
+        try {
+            JDBCResource resource = resourceDAO.getResource(resPath, true, 
true, isAllowBroken);
+            if (resource != null)
+                return new RawResource(resource.getContent(), 
resource.getTimestamp());
+            else
+                return null;
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected long getResourceTimestampImpl(String resPath) throws IOException 
{
+        try {
+            JDBCResource resource = resourceDAO.getResource(resPath, false, 
true);
+            if (resource != null) {
+                return resource.getTimestamp();
+            } else {
+                return 0L;
+            }
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected NavigableSet<String> listResourcesImpl(String folderPath, 
boolean recursive) throws IOException {
+        try {
+            final TreeSet<String> result = 
resourceDAO.listAllResource(makeFolderPath(folderPath), recursive);
+            return result.isEmpty() ? null : result;
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected List<RawResource> getAllResourcesImpl(String folderPath, long 
timeStart, long timeEndExclusive)
+            throws IOException {
+        return getAllResourcesImpl(folderPath, timeStart, timeEndExclusive, 
false);
+    }
+
+    @Override
+    protected List<RawResource> getAllResourcesImpl(String folderPath, long 
timeStart, long timeEndExclusive,
+                                                    final boolean 
isAllowBroken) throws IOException {
+        final List<RawResource> result = Lists.newArrayList();
+        try {
+            List<JDBCResource> allResource = 
resourceDAO.getAllResource(makeFolderPath(folderPath), timeStart,
+                    timeEndExclusive, isAllowBroken);
+            for (JDBCResource resource : allResource) {
+                result.add(new RawResource(resource.getContent(), 
resource.getTimestamp()));
+            }
+            return result;
+        } catch (SQLException e) {
+            for (RawResource rawResource : result) {
+                IOUtils.closeQuietly(rawResource.inputStream);
+            }
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected void putResourceImpl(String resPath, InputStream content, long 
ts) throws IOException {
+        try {
+            JDBCResource resource = new JDBCResource(resPath, ts, content);
+            resourceDAO.putResource(resource);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected long checkAndPutResourceImpl(String resPath, byte[] content, 
long oldTS, long newTS)
+            throws IOException, WriteConflictException {
+        try {
+            resourceDAO.checkAndPutResource(resPath, content, oldTS, newTS);
+            return newTS;
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected void deleteResourceImpl(String resPath) throws IOException {
+        try {
+            resourceDAO.deleteResource(resPath);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected String getReadableResourcePathImpl(String resPath) {
+        return tablesName + "(key='" + resPath + "')@" + 
kylinConfig.getMetadataUrl();
+    }
+
+    private String makeFolderPath(String folderPath) {
+        Preconditions.checkState(folderPath.startsWith("/"));
+        String lookForPrefix = folderPath.endsWith("/") ? folderPath : 
folderPath + "/";
+        return lookForPrefix;
+    }
+
+    protected JDBCResourceDAO getResourceDAO() {
+        return resourceDAO;
+    }
+
+    public long getQueriedSqlNum() {
+        return resourceDAO.getQueriedSqlNum();
+    }
+
+    public static void checkScheme(StorageURL url) {
+        Preconditions.checkState(JDBC_SCHEME.equals(url.getScheme()));
+    }
+}
\ No newline at end of file
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormat.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormat.java
new file mode 100644
index 0000000000..d3d70c369c
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormat.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import java.util.Properties;
+
+public class JDBCSqlQueryFormat {
+    private Properties sqlQueries;
+
+    public JDBCSqlQueryFormat(Properties props) {
+        this.sqlQueries = props;
+    }
+
+    private String getSqlFromProperties(String key) {
+        String sql = sqlQueries.getProperty(key);
+        if (sql == null)
+            throw new RuntimeException(String.format("Property '%s' not 
found", key));
+        return sql;
+    }
+
+    public String getCreateIfNeedSql() {
+        return getSqlFromProperties("format.sql.create-if-need");
+    }
+
+    public String getKeyEqualsSql() {
+        return getSqlFromProperties("format.sql.key-equals");
+    }
+
+    public String getDeletePstatSql() {
+        return getSqlFromProperties("format.sql.delete-pstat");
+    }
+
+    public String getListResourceSql() {
+        return getSqlFromProperties("format.sql.list-resource");
+    }
+
+    public String getAllResourceSql() {
+        return getSqlFromProperties("format.sql.all-resource");
+    }
+
+    public String getReplaceSql() {
+        return getSqlFromProperties("format.sql.replace");
+    }
+
+    public String getInsertSql() {
+        return getSqlFromProperties("format.sql.insert");
+    }
+
+    public String getReplaceSqlWithoutContent() {
+        return getSqlFromProperties("format.sql.replace-without-content");
+    }
+
+    public String getInsertSqlWithoutContent() {
+        return getSqlFromProperties("format.sql.insert-without-content");
+    }
+
+    public String getUpdateSqlWithoutContent() {
+        return getSqlFromProperties("format.sql.update-without-content");
+    }
+
+    public String getUpdateContentSql() {
+        return getSqlFromProperties("format.sql.update-content");
+    }
+
+    public String getTestCreateSql() {
+        return getSqlFromProperties("format.sql.test.create");
+    }
+
+    public String getTestDropSql() {
+        return getSqlFromProperties("format.sql.test.drop");
+    }
+
+    public String getCreateIndexSql() {
+        return getSqlFromProperties("format.sql.create-index");
+    }
+
+    public String getCheckTableExistsSql() {
+        return getSqlFromProperties("format.sql.check-table-exists");
+    }
+}
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormatProvider.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormatProvider.java
new file mode 100644
index 0000000000..bcbc79c89d
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormatProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class JDBCSqlQueryFormatProvider {
+    static Map<String, Properties> cache = new HashMap<>();
+
+    public static JDBCSqlQueryFormat createJDBCSqlQueriesFormat(String 
dialect) {
+        String key = String.format("/metadata-jdbc-%s.properties", 
dialect.toLowerCase());
+        if (cache.containsKey(key)) {
+            return new JDBCSqlQueryFormat(cache.get(key));
+        } else {
+            Properties props = new Properties();
+            InputStream input = null;
+            try {
+                input = props.getClass().getResourceAsStream(key);
+                props.load(input);
+                if (!props.isEmpty()) {
+                    cache.put(key, props);
+                }
+                return new JDBCSqlQueryFormat(props);
+            } catch (Exception e) {
+                throw new RuntimeException(String.format("Can't find 
properties named %s for metastore", key), e);
+            } finally {
+                IOUtils.closeQuietly(input);
+            }
+        }
+
+    }
+}
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 36bb595489..1262680d9b 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -231,6 +231,11 @@ final public long getResourceTimestamp(String resPath) 
throws IOException {
      */
     abstract protected List<RawResource> getAllResourcesImpl(String 
folderPath, long timeStart, long timeEndExclusive) throws IOException;
 
+    protected List<RawResource> getAllResourcesImpl(String folderPath, long 
timeStart, long timeEndExclusive,
+                                                    boolean isAllowBroken) 
throws IOException {
+        return getAllResourcesImpl(folderPath, timeStart, timeEndExclusive);
+    }
+
     /**
      * returns null if not exists
      */
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java 
b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 3aef34ad1e..7bc73873f0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -84,6 +84,16 @@ public static FileSystem getWorkingFileSystem(Configuration 
conf) throws IOExcep
         return getFileSystem(workingPath, conf);
     }
 
+    public static FileSystem getReadFileSystem() throws IOException {
+        Configuration conf = getCurrentConfiguration();
+        return getReadFileSystem(conf);
+    }
+
+    public static FileSystem getReadFileSystem(Configuration conf) throws 
IOException {
+        Path parquetReadPath = new 
Path(KylinConfig.getInstanceFromEnv().getReadHdfsWorkingDirectory(null));
+        return getFileSystem(parquetReadPath, conf);
+    }
+
     public static FileSystem getFileSystem(String path) throws IOException {
         return getFileSystem(new Path(makeURI(path)));
     }
diff --git a/core-common/src/main/resources/metadata-jdbc-mysql.properties 
b/core-common/src/main/resources/metadata-jdbc-mysql.properties
new file mode 100644
index 0000000000..7be6a1b7db
--- /dev/null
+++ b/core-common/src/main/resources/metadata-jdbc-mysql.properties
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+###JDBC METASTORE
+
+format.sql.create-if-need=create table if not exists {0} ( {1} VARCHAR(255) 
primary key, {2} BIGINT, {3} LONGBLOB )
+format.sql.key-equals=select {0} from {1} where {2} = ?
+format.sql.delete-pstat=delete from {0}  where {1} = ?
+format.sql.list-resource=select {0} from {1} where {2} like ?
+format.sql.all-resource=select {0} from {1} where {2} like ? and {3} >= ? and 
{4} < ?
+format.sql.replace=update {0} set {1} = ?,{2} = ? where {3} = ?
+format.sql.insert=replace into {0}({1},{2},{3}) values(?,?,?)
+format.sql.replace-without-content=update {0} set {1} = ? where {2} = ?
+format.sql.insert-without-content=replace into {0}({1},{2}) values(?,?)
+format.sql.update-without-content=update {0} set {1}=? where {2}=? and {3}=?
+format.sql.update-content=update {0} set {1}=? where {2}=?
+format.sql.test.create=create table if not exists {0} (name VARCHAR(255) 
primary key, id BIGINT)
+format.sql.test.drop=drop table if exists {0}
+format.sql.create-index=create index {0} on {1} ({2})
+format.sql.check-table-exists=show tables
\ No newline at end of file
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index a3e7e68517..8dc6532380 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -130,6 +130,11 @@
             <artifactId>maven-model</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>xerces</groupId>
             <artifactId>xercesImpl</artifactId>
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java
 
b/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java
new file mode 100644
index 0000000000..e12ecb19ff
--- /dev/null
+++ 
b/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.jdbc;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.persistence.JDBCConnectionManager;
+import org.apache.kylin.common.persistence.JDBCResourceStore;
+import org.apache.kylin.common.persistence.JDBCSqlQueryFormat;
+import org.apache.kylin.common.persistence.JDBCSqlQueryFormatProvider;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceStoreTest;
+import org.apache.kylin.common.persistence.ResourceTool;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.persistence.StringEntity;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.log4j.component.helpers.MessageFormatter;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.text.MessageFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.UUID;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class ITJDBCResourceStoreTest extends HBaseMetadataTestCase {
+    private static final Logger logger = 
LoggerFactory.getLogger(ITJDBCResourceStoreTest.class);
+
+    private static final String LARGE_CELL_PATH = 
"/cube/_test_large_cell.json";
+    private static final String Large_Content = "THIS_IS_A_LARGE_CELL";
+    private KylinConfig kylinConfig;
+    private JDBCConnectionManager connectionManager;
+    private final String jdbcMetadataUrlNoIdentifier = 
"@jdbc,url=jdbc:mysql://localhost:3306/kylin_it,username=root,password=,maxActive=10,maxIdle=10";
+    private final String mainIdentifier = "kylin_default_instance";
+    private final String copyIdentifier = "kylin_default_instance_copy";
+    private StorageURL metadataUrlBackup;
+    private boolean jdbcConnectable = false;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        KylinConfig configBackup = KylinConfig.createKylinConfig(kylinConfig);
+        Statement statement = null;
+        Connection conn = null;
+        metadataUrlBackup = kylinConfig.getMetadataUrl();
+        kylinConfig.setMetadataUrl(mainIdentifier + 
jdbcMetadataUrlNoIdentifier);
+        JDBCSqlQueryFormat sqlQueryFormat = JDBCSqlQueryFormatProvider
+                
.createJDBCSqlQueriesFormat(KylinConfig.getInstanceFromEnv().getMetadataDialect());
+        try {
+            connectionManager = JDBCConnectionManager.getConnectionManager();
+            conn = connectionManager.getConn();
+            statement = conn.createStatement();
+            String sql = MessageFormat.format(sqlQueryFormat.getTestDropSql(), 
mainIdentifier);
+            statement.executeUpdate(sql);
+            sql = MessageFormat.format(sqlQueryFormat.getTestDropSql(), 
copyIdentifier);
+            statement.executeUpdate(sql);
+            jdbcConnectable = true;
+            ResourceTool.copy(configBackup, kylinConfig);
+        } catch (RuntimeException ex) {
+            logger.info("Init connection manager failed, skip test cases");
+        } finally {
+            JDBCConnectionManager.closeQuietly(statement);
+            JDBCConnectionManager.closeQuietly(conn);
+        }
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+        kylinConfig.setMetadataUrl(metadataUrlBackup.toString());
+    }
+
+    @Test
+    public void testConnectJDBC() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        Connection conn = null;
+        try {
+            conn = connectionManager.getConn();
+            assertNotNull(conn);
+        } finally {
+            JDBCConnectionManager.closeQuietly(conn);
+        }
+    }
+
+    @Test
+    public void testJdbcBasicFunction() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        Connection conn = null;
+        Statement statement = null;
+        String createTableSql = "CREATE TABLE test(col1 VARCHAR (10), col2 
INTEGER )";
+        String dropTableSql = "DROP TABLE IF EXISTS test";
+        try {
+            conn = connectionManager.getConn();
+            statement = conn.createStatement();
+            statement.executeUpdate(dropTableSql);
+            statement.executeUpdate(createTableSql);
+            statement.executeUpdate(dropTableSql);
+        } finally {
+            JDBCConnectionManager.closeQuietly(statement);
+            JDBCConnectionManager.closeQuietly(conn);
+        }
+    }
+
+    //   Support other db except mysql
+    //   @Test
+    //    public void testGetDbcpProperties() {
+    //        Properties prop = 
JDBCConnectionManager.getConnectionManager().getDbcpProperties();
+    //        assertEquals("com.mysql.jdbc.Driver", 
prop.get("driverClassName"));
+    //    }
+
+    @Test
+    public void testMsgFormatter() {
+        System.out.println(MessageFormatter.format("{}:{}", "a", "b"));
+    }
+
+    @Test
+    public void testResourceStoreBasic() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        ResourceStoreTest.testAStore(
+                ResourceStoreTest.mockUrl(
+                        StringUtils.substringAfterLast(mainIdentifier + 
jdbcMetadataUrlNoIdentifier, "@"), kylinConfig),
+                kylinConfig);
+    }
+
+    @Test
+    public void testJDBCStoreWithLargeCell() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        JDBCResourceStore store = null;
+        StringEntity content = new StringEntity(Large_Content);
+        String largePath = "/large/large.json";
+        try {
+            String oldUrl = ResourceStoreTest.replaceMetadataUrl(kylinConfig,
+                    ResourceStoreTest.mockUrl("jdbc", kylinConfig));
+            store = new JDBCResourceStore(KylinConfig.getInstanceFromEnv());
+            store.deleteResource(largePath);
+            store.putResource(largePath, content, StringEntity.serializer);
+            assertTrue(store.exists(largePath));
+            StringEntity t = store.getResource(largePath, StringEntity.class, 
StringEntity.serializer);
+            assertEquals(content, t);
+            store.deleteResource(LARGE_CELL_PATH);
+            ResourceStoreTest.replaceMetadataUrl(kylinConfig, oldUrl);
+        } finally {
+            if (store != null)
+                store.deleteResource(LARGE_CELL_PATH);
+        }
+    }
+
+    @Test
+    public void testPerformance() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        ResourceStoreTest.testPerformance(ResourceStoreTest.mockUrl("jdbc", 
kylinConfig), kylinConfig);
+        ResourceStoreTest.testPerformance(ResourceStoreTest.mockUrl("hbase", 
kylinConfig), kylinConfig);
+    }
+
+    @Test
+    public void testMaxCell() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        byte[] data = new byte[500 * 1024];
+        for (int i = 0; i < data.length; i++) {
+            data[i] = (byte) 0;
+        }
+        JDBCResourceStore store = null;
+        ByteEntity content = new ByteEntity(data);
+        try {
+            String oldUrl = ResourceStoreTest.replaceMetadataUrl(kylinConfig,
+                    ResourceStoreTest.mockUrl("jdbc", kylinConfig));
+            store = new JDBCResourceStore(KylinConfig.getInstanceFromEnv());
+            store.deleteResource(LARGE_CELL_PATH);
+            store.putResource(LARGE_CELL_PATH, content, ByteEntity.serializer);
+            assertTrue(store.exists(LARGE_CELL_PATH));
+            ByteEntity t = store.getResource(LARGE_CELL_PATH, 
ByteEntity.class, ByteEntity.serializer);
+            assertEquals(content, t);
+            store.deleteResource(LARGE_CELL_PATH);
+            ResourceStoreTest.replaceMetadataUrl(kylinConfig, oldUrl);
+        } finally {
+            if (store != null)
+                store.deleteResource(LARGE_CELL_PATH);
+        }
+    }
+
+    @Test
+    public void testPerformanceWithResourceTool() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        KylinConfig tmpConfig = 
KylinConfig.createKylinConfig(KylinConfig.getInstanceFromEnv());
+        tmpConfig.setMetadataUrl(copyIdentifier + jdbcMetadataUrlNoIdentifier);
+
+        JDBCResourceStore store = (JDBCResourceStore) 
ResourceStore.getStore(kylinConfig);
+        NavigableSet<String> executes = 
store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
+        NavigableSet<String> executeOutputs = 
store.listResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
+
+        long startTs = System.currentTimeMillis();
+
+        for (String execute : executes) {
+            String uuid = StringUtils.substringAfterLast(execute, "/");
+            RawResource executeResource = store.getResource(execute);
+            Map<String, RawResource> executeOutputResourceMap = new 
HashMap<>();
+
+            for (String executeOutput : executeOutputs) {
+                if (executeOutput.contains(uuid)) {
+                    RawResource executeOutputResource = 
store.getResource(executeOutput);
+                    executeOutputResourceMap.put(executeOutput, 
executeOutputResource);
+                }
+            }
+
+            for (int i = 0; i < 200; i++) {
+                String newUuid = UUID.randomUUID().toString();
+                store.putResource(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + 
newUuid, executeResource.inputStream,
+                        System.currentTimeMillis());
+
+                for (String key : executeOutputResourceMap.keySet()) {
+                    String step = StringUtils.substringAfterLast(key, uuid);
+                    
store.putResource(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + newUuid + 
step,
+                            executeOutputResourceMap.get(key).inputStream, 
System.currentTimeMillis());
+                }
+            }
+        }
+
+        long queryNumBeforeCopy = store.getQueriedSqlNum();
+        ResourceTool.copy(kylinConfig, tmpConfig);
+        long endTs = System.currentTimeMillis();
+        long queryNumAfterCopy = store.getQueriedSqlNum();
+        JDBCResourceStore resourceStoreCopy = (JDBCResourceStore) 
ResourceStore.getStore(tmpConfig);
+
+        int executeNum = store.listResources("/execute").size();
+        int executeOutputNum = store.listResources("/execute_output").size();
+
+        assertEquals(executeNum, 
resourceStoreCopy.listResources("/execute").size());
+        assertEquals(executeOutputNum, 
resourceStoreCopy.listResources("/execute_output").size());
+
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String startTime = sdf.format(new 
Date(Long.parseLong(String.valueOf(startTs))));
+        String endTime = sdf.format(new 
Date(Long.parseLong(String.valueOf(endTs))));
+
+        logger.info("Test performance with ResourceTool done during " + 
startTime + " to " + endTime);
+        logger.info("Now there is " + executeNum + " execute data and " + 
executeOutputNum
+                + " execute_output data in resource store.");
+        logger.info("Resource store run " + queryNumBeforeCopy + " sqls for 
metadata generation, and "
+                + (queryNumAfterCopy - queryNumBeforeCopy) + " sqls for copy 
with ResourceTool.");
+        assertTrue((queryNumAfterCopy - queryNumBeforeCopy) < 
queryNumBeforeCopy);
+        logger.info("This test is expected to be done in 10 mins.");
+        assertTrue((endTs - startTs) < 600000);
+    }
+
+    @SuppressWarnings("serial")
+    public static class ByteEntity extends RootPersistentEntity {
+
+        public static final Serializer<ByteEntity> serializer = new 
Serializer<ByteEntity>() {
+
+            @Override
+            public void serialize(ByteEntity obj, DataOutputStream out) throws 
IOException {
+                byte[] data = obj.getData();
+                out.writeInt(data.length);
+                out.write(data);
+            }
+
+            @Override
+            public ByteEntity deserialize(DataInputStream in) throws 
IOException {
+                int length = in.readInt();
+                byte[] bytes = new byte[length];
+                in.read(bytes);
+                return new ByteEntity(bytes);
+            }
+        };
+        byte[] data;
+
+        public ByteEntity() {
+
+        }
+
+        public ByteEntity(byte[] data) {
+            this.data = data;
+        }
+
+        public static Serializer<ByteEntity> getSerializer() {
+            return serializer;
+        }
+
+        public byte[] getData() {
+            return data;
+        }
+
+        public void setData(byte[] data) {
+            this.data = data;
+        }
+    }
+}
diff --git a/pom.xml b/pom.xml
index cd186599cb..0d38f9fb55 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,9 @@
         <spark.version>2.1.2</spark.version>
         <kryo.version>4.0.0</kryo.version>
 
+        <!-- mysql versions -->
+        <mysql-connector.version>5.1.8</mysql-connector.version>
+
         <!-- Scala versions -->
         <scala.version>2.11.0</scala.version>
 
@@ -550,6 +553,13 @@
                 <version>${hbase-hadoop2.version}</version>
                 <scope>test</scope>
             </dependency>
+            <!-- jdbc dependencies -->
+            <dependency>
+                <groupId>mysql</groupId>
+                <artifactId>mysql-connector-java</artifactId>
+                <version>${mysql-connector.version}</version>
+                <scope>provided</scope>
+            </dependency>
             <!-- Hive dependencies -->
             <dependency>
                 <groupId>org.apache.hive</groupId>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support MySQL as Kylin metadata storage
> ---------------------------------------
>
>                 Key: KYLIN-3488
>                 URL: https://issues.apache.org/jira/browse/KYLIN-3488
>             Project: Kylin
>          Issue Type: New Feature
>          Components: Metadata
>            Reporter: Shaofeng SHI
>            Priority: Major
>             Fix For: v2.5.0
>
>
> Kylin uses HBase as the metastore; But in some cases user expects the 
> metadata not in HBase.
> Sonny Heer from mailing list mentioned:
> "I'm fairly certain anyone using Kylin with AWS EMR will benefit from this.   
> Having multiple hbase clusters across AZs is a huge benefit.  BTW only thing 
> blocking at the moment is write operations happening from kylin query nodes."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to