[ 
https://issues.apache.org/jira/browse/HDFS-16943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708141#comment-17708141
 ] 

ASF GitHub Bot commented on HDFS-16943:
---------------------------------------

virajjasani commented on code in PR #5469:
URL: https://github.com/apache/hadoop/pull/5469#discussion_r1156476326


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java:
##########
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
+import 
org.apache.hadoop.hdfs.server.federation.router.security.token.SQLConnectionFactory;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import 
org.apache.hadoop.hdfs.server.federation.store.records.DisabledNameservice;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.*;
+import static org.apache.hadoop.util.Time.*;
+
+/**
+ * StateStoreDriver implementation based on MySQL.
+ * There is a separate table for each record type. Each table just as two
+ * columns, recordKey and recordValue.
+ */
+public class StateStoreMySQLImpl extends StateStoreSerializableImpl {
+  public static final String SQL_STATE_STORE_CONF_PREFIX = 
"state-store-mysql.";
+  public static final String CONNECTION_URL =
+      SQL_STATE_STORE_CONF_PREFIX + "connection.url";
+  public static final String CONNECTION_USERNAME =
+      SQL_STATE_STORE_CONF_PREFIX + "connection.username";
+  public static final String CONNECTION_PASSWORD =
+      SQL_STATE_STORE_CONF_PREFIX + "connection.password";
+  public static final String CONNECTION_DRIVER =
+      SQL_STATE_STORE_CONF_PREFIX + "connection.driver";
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreSerializableImpl.class);
+  private SQLConnectionFactory connectionFactory;
+  /** If the driver has been initialized. */
+  private boolean initialized = false;
+  private final static Set<String> VALID_TABLES = Collections.unmodifiableSet(
+      new HashSet<>(Arrays.asList(
+          MembershipState.class.getSimpleName(),
+          RouterState.class.getSimpleName(),
+          MountTable.class.getSimpleName(),
+          DisabledNameservice.class.getSimpleName()
+      ))
+  );
+
+  @Override
+  public boolean initDriver() {
+    Configuration conf = getConf();
+    connectionFactory = new 
MySQLStateStoreHikariDataSourceConnectionFactory(conf);
+    initialized = true;
+    LOG.info("MySQL state store connection factory initialized");
+    return true;
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean initRecordStorage(String className, 
Class<T> clazz) {
+    String tableName = getAndValidateTableNameForClass(clazz);
+    try (Connection connection = connectionFactory.getConnection();
+        ResultSet resultSet = connection
+            .getMetaData()
+            .getTables(null, null, tableName, null)) {
+      if (resultSet.next()) {
+        return true;
+      }
+    } catch (SQLException e) {
+      LOG.error("Could not check if table {} able exists", tableName);
+    }
+
+    try (Connection connection = connectionFactory.getConnection();
+        Statement statement = connection.createStatement()) {
+      String sql = String.format("CREATE TABLE %s ("
+          + "recordKey VARCHAR (255) NOT NULL,"
+          + "recordValue VARCHAR (2047) NOT NULL, "
+          + "PRIMARY KEY(recordKey))", tableName);
+      statement.execute(sql);
+      return true;
+    } catch (SQLException e) {
+      LOG.error(String.format("Cannot create table %s for record type %s.",
+          tableName, className), e.getMessage());
+      return false;
+    }
+  }
+
+  @Override
+  public boolean isDriverReady() {
+    return this.initialized;
+  }
+
+  @Override
+  public void close() throws Exception {
+    connectionFactory.shutdown();
+  }
+
+  @Override
+  public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
+      throws IOException {
+    String tableName = getAndValidateTableNameForClass(clazz);
+    verifyDriverReady();
+    long start = monotonicNow();
+    StateStoreMetrics metrics = getMetrics();
+    List<T> ret = new ArrayList<>();
+    try (Connection connection = connectionFactory.getConnection();
+        PreparedStatement statement = connection.prepareStatement(
+            String.format("SELECT * FROM %s", tableName))) {
+      try (ResultSet result = statement.executeQuery()) {
+        while(result.next()) {
+          String recordValue = result.getString("recordValue");
+          T record = newRecord(recordValue, clazz, false);
+          ret.add(record);
+        }
+      }
+    } catch (SQLException e) {
+      if (metrics != null) {
+        metrics.addFailure(monotonicNow() - start);
+      }
+      String msg = "Cannot fetch records for " + clazz.getSimpleName();
+      LOG.error(msg, e);
+      throw new IOException(msg, e);
+    }
+
+    if (metrics != null) {
+      metrics.addRead(monotonicNow() - start);
+    }
+    return new QueryResult<>(ret, getTime());
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean putAll(
+      List<T> records, boolean allowUpdate, boolean errorIfExists) throws 
IOException {
+    if (records.isEmpty()) {
+      return true;
+    }
+
+    verifyDriverReady();
+    StateStoreMetrics metrics = getMetrics();
+
+    long start = monotonicNow();
+
+    boolean success = true;
+    for (T record : records) {
+      String tableName = getAndValidateTableNameForClass(record.getClass());
+      String primaryKey = getPrimaryKey(record);
+      String data = serializeString(record);
+
+      if (recordExists(tableName, primaryKey)) {
+        if (allowUpdate) {
+          // Update the mod time stamp. Many backends will use their
+          // own timestamp for the mod time.
+          record.setDateModified(this.getTime());
+          if (!updateRecord(tableName, primaryKey, data)) {
+            LOG.error("Cannot write {} into table {}", primaryKey, tableName);
+            success = false;

Review Comment:
   Oh yes, not required for this PR for sure. Adding new mount point or 
updating it is anyways single record update only.





> RBF: Implement MySQL based StateStoreDriver
> -------------------------------------------
>
>                 Key: HDFS-16943
>                 URL: https://issues.apache.org/jira/browse/HDFS-16943
>             Project: Hadoop HDFS
>          Issue Type: Task
>          Components: hdfs, rbf
>            Reporter: Simbarashe Dzinamarira
>            Assignee: Simbarashe Dzinamarira
>            Priority: Major
>              Labels: pull-request-available
>
> RBF supports two types of StateStoreDrivers
>  # StateStoreFileImpl
>  # StateStoreZooKeeperImpl
> I propose implementing a third driver that is backed by MySQL.
>  * StateStoreZooKeeperImpl requires an additional Zookeeper cluster.
>  * StateStoreFileImpl can use one of the namenodes in the HDFS cluster, but 
> that namenode becomes a single point of failure, introducing coupling between 
> the federated clusters.
>  HADOOP-18535 implemented a MySQL token store. When tokens are stored in 
> MySQL, using MySQL for the StateStore as well reduces the number of external 
> dependencies for routers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to