HDFS-10630. Federation State Store FS Implementation. Contributed by Jason Kace and Inigo Goiri.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ee748132 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ee748132 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ee748132 Branch: refs/heads/HDFS-10467 Commit: ee74813241fc0b0a0c19b3ada64426c7b2c29cf4 Parents: 4319a10 Author: Inigo Goiri <inigo...@apache.org> Authored: Tue May 2 15:49:53 2017 -0700 Committer: Inigo Goiri <inigo...@apache.org> Committed: Thu Aug 31 19:39:53 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 14 + .../federation/router/PeriodicService.java | 198 ++++++++ .../StateStoreConnectionMonitorService.java | 67 +++ .../federation/store/StateStoreService.java | 152 +++++- .../federation/store/StateStoreUtils.java | 51 +- .../store/driver/StateStoreDriver.java | 31 +- .../driver/StateStoreRecordOperations.java | 17 +- .../store/driver/impl/StateStoreBaseImpl.java | 31 +- .../driver/impl/StateStoreFileBaseImpl.java | 429 ++++++++++++++++ .../store/driver/impl/StateStoreFileImpl.java | 161 +++++++ .../driver/impl/StateStoreFileSystemImpl.java | 178 +++++++ .../driver/impl/StateStoreSerializableImpl.java | 77 +++ .../federation/store/records/BaseRecord.java | 20 +- .../server/federation/store/records/Query.java | 66 +++ .../src/main/resources/hdfs-default.xml | 16 + .../store/FederationStateStoreTestUtils.java | 232 +++++++++ .../store/driver/TestStateStoreDriverBase.java | 483 +++++++++++++++++++ .../store/driver/TestStateStoreFile.java | 64 +++ .../store/driver/TestStateStoreFileSystem.java | 88 ++++ 19 files changed, 2329 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee748132/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 7623839..8cdd450 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -25,6 +27,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl; import org.apache.hadoop.http.HttpConfig; @@ -1134,6 +1138,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys { FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT = StateStoreSerializerPBImpl.class; + public static final String FEDERATION_STORE_DRIVER_CLASS = + FEDERATION_STORE_PREFIX + "driver.class"; + public static final Class<? extends StateStoreDriver> + FEDERATION_STORE_DRIVER_CLASS_DEFAULT = StateStoreFileImpl.class; + + public static final String FEDERATION_STORE_CONNECTION_TEST_MS = + FEDERATION_STORE_PREFIX + "connection.test"; + public static final long FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT = + TimeUnit.MINUTES.toMillis(1); + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee748132/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java new file mode 100644 index 0000000..5e12222 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Service to periodically execute a runnable. + */ +public abstract class PeriodicService extends AbstractService { + + private static final Logger LOG = + LoggerFactory.getLogger(PeriodicService.class); + + /** Default interval in milliseconds for the periodic service. */ + private static final long DEFAULT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); + + + /** Interval for running the periodic service in milliseconds. */ + private long intervalMs; + /** Name of the service. */ + private final String serviceName; + + /** Scheduler for the periodic service. */ + private final ScheduledExecutorService scheduler; + + /** If the service is running. */ + private volatile boolean isRunning = false; + + /** How many times we run. */ + private long runCount; + /** How many errors we got. */ + private long errorCount; + /** When was the last time we executed this service successfully. */ + private long lastRun; + + /** + * Create a new periodic update service. + * + * @param name Name of the service. + */ + public PeriodicService(String name) { + this(name, DEFAULT_INTERVAL_MS); + } + + /** + * Create a new periodic update service. + * + * @param name Name of the service. + * @param interval Interval for the periodic service in milliseconds. + */ + public PeriodicService(String name, long interval) { + super(name); + this.serviceName = name; + this.intervalMs = interval; + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat(this.getName() + "-%d") + .build(); + this.scheduler = Executors.newScheduledThreadPool(1, threadFactory); + } + + /** + * Set the interval for the periodic service. + * + * @param interval Interval in milliseconds. + */ + protected void setIntervalMs(long interval) { + if (getServiceState() == STATE.STARTED) { + throw new ServiceStateException("Periodic service already started"); + } else { + this.intervalMs = interval; + } + } + + /** + * Get the interval for the periodic service. + * + * @return Interval in milliseconds. + */ + protected long getIntervalMs() { + return this.intervalMs; + } + + /** + * Get how many times we failed to run the periodic service. + * + * @return Times we failed to run the periodic service. + */ + protected long getErrorCount() { + return this.errorCount; + } + + /** + * Get how many times we run the periodic service. + * + * @return Times we run the periodic service. + */ + protected long getRunCount() { + return this.runCount; + } + + /** + * Get the last time the periodic service was executed. + * + * @return Last time the periodic service was executed. + */ + protected long getLastUpdate() { + return this.lastRun; + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + LOG.info("Starting periodic service {}", this.serviceName); + startPeriodic(); + } + + @Override + protected void serviceStop() throws Exception { + stopPeriodic(); + LOG.info("Stopping periodic service {}", this.serviceName); + super.serviceStop(); + } + + /** + * Stop the periodic task. + */ + protected synchronized void stopPeriodic() { + if (this.isRunning) { + LOG.info("{} is shutting down", this.serviceName); + this.isRunning = false; + this.scheduler.shutdownNow(); + } + } + + /** + * Start the periodic execution. + */ + protected synchronized void startPeriodic() { + stopPeriodic(); + + // Create the runnable service + Runnable updateRunnable = new Runnable() { + @Override + public void run() { + LOG.debug("Running {} update task", serviceName); + try { + if (!isRunning) { + return; + } + periodicInvoke(); + runCount++; + lastRun = Time.now(); + } catch (Exception ex) { + errorCount++; + LOG.warn(serviceName + " service threw an exception", ex); + } + } + }; + + // Start the execution of the periodic service + this.isRunning = true; + this.scheduler.scheduleWithFixedDelay( + updateRunnable, 0, this.intervalMs, TimeUnit.MILLISECONDS); + } + + /** + * Method that the service will run periodically. + */ + protected abstract void periodicInvoke(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee748132/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java new file mode 100644 index 0000000..4d279c5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.PeriodicService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to periodically monitor the connection of the StateStore + * {@link StateStoreService} data store and to re-open the connection + * to the data store if required. + */ +public class StateStoreConnectionMonitorService extends PeriodicService { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreConnectionMonitorService.class); + + /** Service that maintains the State Store connection. */ + private final StateStoreService stateStore; + + + /** + * Create a new service to monitor the connectivity of the state store driver. + * + * @param store Instance of the state store to be monitored. + */ + public StateStoreConnectionMonitorService(StateStoreService store) { + super(StateStoreConnectionMonitorService.class.getSimpleName()); + this.stateStore = store; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.setIntervalMs(conf.getLong( + DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS, + DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT)); + + super.serviceInit(conf); + } + + @Override + public void periodicInvoke() { + LOG.debug("Checking state store connection"); + if (!stateStore.isDriverReady()) { + LOG.info("Attempting to open state store driver."); + stateStore.loadDriver(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee748132/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java index 866daa3..df207e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java @@ -15,45 +15,168 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hdfs.server.federation.store; +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; /** * A service to initialize a * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver - * StateStoreDriver} and maintain the connection to the data store. There - * are multiple state store driver connections supported: + * StateStoreDriver} and maintain the connection to the data store. There are + * multiple state store driver connections supported: * <ul> - * <li>File {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl. + * <li>File + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl. * StateStoreFileImpl StateStoreFileImpl} - * <li>ZooKeeper {@link org.apache.hadoop.hdfs.server.federation.store.driver. - * impl.StateStoreZooKeeperImpl StateStoreZooKeeperImpl} + * <li>ZooKeeper + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl. + * StateStoreZooKeeperImpl StateStoreZooKeeperImpl} * </ul> * <p> - * The service also supports the dynamic registration of data interfaces such as - * the following: + * The service also supports the dynamic registration of record stores like: * <ul> - * <li>{@link MembershipStateStore}: state of the Namenodes in the + * <li>{@link MembershipStore}: state of the Namenodes in the * federation. * <li>{@link MountTableStore}: Mount table between to subclusters. * See {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}. - * <li>{@link RouterStateStore}: State of the routers in the federation. * </ul> */ @InterfaceAudience.Private @InterfaceStability.Evolving public class StateStoreService extends CompositeService { + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreService.class); + + + /** State Store configuration. */ + private Configuration conf; + /** Identifier for the service. */ private String identifier; - // Stub class - public StateStoreService(String name) { - super(name); + /** Driver for the back end connection. */ + private StateStoreDriver driver; + + /** Service to maintain data store connection. */ + private StateStoreConnectionMonitorService monitorService; + + + public StateStoreService() { + super(StateStoreService.class.getName()); + } + + /** + * Initialize the State Store and the connection to the backend. + * + * @param config Configuration for the State Store. + * @throws IOException + */ + @Override + protected void serviceInit(Configuration config) throws Exception { + this.conf = config; + + // Create implementation of State Store + Class<? extends StateStoreDriver> driverClass = this.conf.getClass( + DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS, + DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT, + StateStoreDriver.class); + this.driver = ReflectionUtils.newInstance(driverClass, this.conf); + + if (this.driver == null) { + throw new IOException("Cannot create driver for the State Store"); + } + + // Check the connection to the State Store periodically + this.monitorService = new StateStoreConnectionMonitorService(this); + this.addService(monitorService); + + super.serviceInit(this.conf); + } + + @Override + protected void serviceStart() throws Exception { + loadDriver(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + closeDriver(); + + super.serviceStop(); + } + + /** + * List of records supported by this State Store. + * + * @return List of supported record classes. + */ + public Collection<Class<? extends BaseRecord>> getSupportedRecords() { + // TODO add list of records + return new LinkedList<>(); + } + + /** + * Load the State Store driver. If successful, refresh cached data tables. + */ + public void loadDriver() { + synchronized (this.driver) { + if (!isDriverReady()) { + String driverName = this.driver.getClass().getSimpleName(); + if (this.driver.init(conf, getIdentifier(), getSupportedRecords())) { + LOG.info("Connection to the State Store driver {} is open and ready", + driverName); + } else { + LOG.error("Cannot initialize State Store driver {}", driverName); + } + } + } + } + + /** + * Check if the driver is ready to be used. + * + * @return If the driver is ready. + */ + public boolean isDriverReady() { + return this.driver.isDriverReady(); + } + + /** + * Manually shuts down the driver. + * + * @throws Exception If the driver cannot be closed. + */ + @VisibleForTesting + public void closeDriver() throws Exception { + if (this.driver != null) { + this.driver.close(); + } + } + + /** + * Get the state store driver. + * + * @return State store driver. + */ + public StateStoreDriver getDriver() { + return this.driver; } /** @@ -74,4 +197,5 @@ public class StateStoreService extends CompositeService { public void setIdentifier(String id) { this.identifier = id; } -} + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee748132/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java index 8c681df..0a36619 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java @@ -17,17 +17,22 @@ */ package org.apache.hadoop.hdfs.server.federation.store; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Set of utility functions used to query, create, update and delete data - * records in the state store. + * Set of utility functions used to work with the State Store. */ public final class StateStoreUtils { - private static final Log LOG = LogFactory.getLog(StateStoreUtils.class); + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreUtils.class); + private StateStoreUtils() { // Utility class @@ -52,7 +57,7 @@ public final class StateStoreUtils { // Check if we went too far if (actualClazz.equals(BaseRecord.class)) { - LOG.error("We went too far (" + actualClazz + ") with " + clazz); + LOG.error("We went too far ({}) with {}", actualClazz, clazz); actualClazz = clazz; } return actualClazz; @@ -69,4 +74,36 @@ public final class StateStoreUtils { Class<? extends BaseRecord> getRecordClass(final T record) { return getRecordClass(record.getClass()); } -} + + /** + * Get the base class name for a record. If we get an implementation of a + * record we will return the real parent record class. + * + * @param clazz Class of the data record to check. + * @return Name of the base class for the record. + */ + public static <T extends BaseRecord> String getRecordName( + final Class<T> clazz) { + return getRecordClass(clazz).getSimpleName(); + } + + /** + * Filters a list of records to find all records matching the query. + * + * @param query Map of field names and objects to use to filter results. + * @param records List of data records to filter. + * @return List of all records matching the query (or empty list if none + * match), null if the data set could not be filtered. + */ + public static <T extends BaseRecord> List<T> filterMultiple( + final Query<T> query, final Iterable<T> records) { + + List<T> matchingList = new ArrayList<>(); + for (T record : records) { + if (query.matches(record)) { + matchingList.add(record); + } + } + return matchingList; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee748132/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java index a1527df..90111bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java @@ -18,15 +18,16 @@ package org.apache.hadoop.hdfs.server.federation.store.driver; import java.net.InetAddress; -import java.util.List; +import java.util.Collection; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Driver class for an implementation of a {@link StateStoreService} @@ -35,7 +36,8 @@ import org.apache.hadoop.util.Time; */ public abstract class StateStoreDriver implements StateStoreRecordOperations { - private static final Log LOG = LogFactory.getLog(StateStoreDriver.class); + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreDriver.class); /** State Store configuration. */ @@ -47,13 +49,14 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { /** * Initialize the state store connection. + * * @param config Configuration for the driver. * @param id Identifier for the driver. * @param records Records that are supported. * @return If initialized and ready, false if failed to initialize driver. */ public boolean init(final Configuration config, final String id, - final List<Class<? extends BaseRecord>> records) { + final Collection<Class<? extends BaseRecord>> records) { this.conf = config; this.identifier = id; @@ -62,8 +65,20 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { LOG.warn("The identifier for the State Store connection is not set"); } - // TODO stub - return false; + boolean success = initDriver(); + if (!success) { + LOG.error("Cannot intialize driver for {}", getDriverName()); + return false; + } + + for (Class<? extends BaseRecord> cls : records) { + String recordString = StateStoreUtils.getRecordName(cls); + if (!initRecordStorage(recordString, cls)) { + LOG.error("Cannot initialize record store for {}", cls.getSimpleName()); + return false; + } + } + return true; } /** @@ -169,4 +184,4 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { } return hostname; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee748132/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java index 739eeba..e76a733 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java @@ -19,11 +19,11 @@ package org.apache.hadoop.hdfs.server.federation.store.driver; import java.io.IOException; import java.util.List; -import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; import org.apache.hadoop.io.retry.AtMostOnce; import org.apache.hadoop.io.retry.Idempotent; @@ -67,14 +67,14 @@ public interface StateStoreRecordOperations { * Get a single record from the store that matches the query. * * @param clazz Class of record to fetch. - * @param query Map of field names and objects to filter results. + * @param query Query to filter results. * @return A single record matching the query. Null if there are no matching * records or more than one matching record in the store. * @throws IOException If multiple records match or if the data store cannot * be queried. */ @Idempotent - <T extends BaseRecord> T get(Class<T> clazz, Map<String, String> query) + <T extends BaseRecord> T get(Class<T> clazz, Query<T> query) throws IOException; /** @@ -83,14 +83,14 @@ public interface StateStoreRecordOperations { * supports filtering it should overwrite this method. * * @param clazz Class of record to fetch. - * @param query Map of field names and objects to filter results. + * @param query Query to filter results. * @return Records of type clazz that match the query or empty list if none * are found. * @throws IOException Throws exception if unable to query the data store. */ @Idempotent <T extends BaseRecord> List<T> getMultiple( - Class<T> clazz, Map<String, String> query) throws IOException; + Class<T> clazz, Query<T> query) throws IOException; /** * Creates a single record. Optionally updates an existing record with same @@ -152,13 +152,12 @@ public interface StateStoreRecordOperations { * Remove multiple records of a specific class that match a query. Requires * the getAll implementation to fetch fresh records on each call. * - * @param clazz Class of record to remove. - * @param filter matching filter to remove. + * @param query Query to filter what to remove. * @return The number of records removed. * @throws IOException Throws exception if unable to query the data store. */ @AtMostOnce - <T extends BaseRecord> int remove(Class<T> clazz, Map<String, String> filter) + <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query) throws IOException; -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee748132/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java index b711fa9..1bd35f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java @@ -17,14 +17,17 @@ */ package org.apache.hadoop.hdfs.server.federation.store.driver.impl; +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; + import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; /** * Base implementation of a State Store driver. It contains default @@ -41,7 +44,7 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver { @Override public <T extends BaseRecord> T get( - Class<T> clazz, Map<String, String> query) throws IOException { + Class<T> clazz, Query<T> query) throws IOException { List<T> records = getMultiple(clazz, query); if (records.size() > 1) { throw new IOException("Found more than one object in collection"); @@ -53,17 +56,31 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver { } @Override + public <T extends BaseRecord> List<T> getMultiple( + Class<T> clazz, Query<T> query) throws IOException { + QueryResult<T> result = get(clazz); + List<T> records = result.getRecords(); + List<T> ret = filterMultiple(query, records); + if (ret == null) { + throw new IOException("Cannot fetch records from the store"); + } + return ret; + } + + @Override public <T extends BaseRecord> boolean put( T record, boolean allowUpdate, boolean errorIfExists) throws IOException { - List<T> singletonList = new ArrayList<T>(); + List<T> singletonList = new ArrayList<>(); singletonList.add(record); return putAll(singletonList, allowUpdate, errorIfExists); } @Override public <T extends BaseRecord> boolean remove(T record) throws IOException { - Map<String, String> primaryKeys = record.getPrimaryKeys(); - Class<? extends BaseRecord> clazz = StateStoreUtils.getRecordClass(record); - return remove(clazz, primaryKeys) == 1; + final Query<T> query = new Query<T>(record); + Class<? extends BaseRecord> clazz = record.getClass(); + @SuppressWarnings("unchecked") + Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz); + return remove(recordClass, query) == 1; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee748132/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java new file mode 100644 index 0000000..d7c00ff --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java @@ -0,0 +1,429 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordClass; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link StateStoreDriver} implementation based on a local file. + */ +public abstract class StateStoreFileBaseImpl + extends StateStoreSerializableImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreFileBaseImpl.class); + + /** If it is initialized. */ + private boolean initialized = false; + + /** Name of the file containing the data. */ + private static final String DATA_FILE_NAME = "records.data"; + + + /** + * Lock reading records. + * + * @param clazz Class of the record. + */ + protected abstract <T extends BaseRecord> void lockRecordRead(Class<T> clazz); + + /** + * Unlock reading records. + * + * @param clazz Class of the record. + */ + protected abstract <T extends BaseRecord> void unlockRecordRead( + Class<T> clazz); + + /** + * Lock writing records. + * + * @param clazz Class of the record. + */ + protected abstract <T extends BaseRecord> void lockRecordWrite( + Class<T> clazz); + + /** + * Unlock writing records. + * + * @param clazz Class of the record. + */ + protected abstract <T extends BaseRecord> void unlockRecordWrite( + Class<T> clazz); + + /** + * Get the reader for the file system. + * + * @param clazz Class of the record. + */ + protected abstract <T extends BaseRecord> BufferedReader getReader( + Class<T> clazz, String sub); + + /** + * Get the writer for the file system. + * + * @param clazz Class of the record. + */ + protected abstract <T extends BaseRecord> BufferedWriter getWriter( + Class<T> clazz, String sub); + + /** + * Check if a path exists. + * + * @param path Path to check. + * @return If the path exists. + */ + protected abstract boolean exists(String path); + + /** + * Make a directory. + * + * @param path Path of the directory to create. + * @return If the directory was created. + */ + protected abstract boolean mkdir(String path); + + /** + * Get root directory. + * + * @return Root directory. + */ + protected abstract String getRootDir(); + + /** + * Set the driver as initialized. + * + * @param ini If the driver is initialized. + */ + public void setInitialized(boolean ini) { + this.initialized = ini; + } + + @Override + public boolean initDriver() { + String rootDir = getRootDir(); + try { + if (rootDir == null) { + LOG.error("Invalid root directory, unable to initialize driver."); + return false; + } + + // Check root path + if (!exists(rootDir)) { + if (!mkdir(rootDir)) { + LOG.error("Cannot create State Store root directory {}", rootDir); + return false; + } + } + } catch (Exception ex) { + LOG.error( + "Cannot initialize filesystem using root directory {}", rootDir, ex); + return false; + } + setInitialized(true); + return true; + } + + @Override + public <T extends BaseRecord> boolean initRecordStorage( + String className, Class<T> recordClass) { + + String dataDirPath = getRootDir() + "/" + className; + try { + // Create data directories for files + if (!exists(dataDirPath)) { + LOG.info("{} data directory doesn't exist, creating it", dataDirPath); + if (!mkdir(dataDirPath)) { + LOG.error("Cannot create data directory {}", dataDirPath); + return false; + } + String dataFilePath = dataDirPath + "/" + DATA_FILE_NAME; + if (!exists(dataFilePath)) { + // Create empty file + List<T> emtpyList = new ArrayList<>(); + if(!writeAll(emtpyList, recordClass)) { + LOG.error("Cannot create data file {}", dataFilePath); + return false; + } + } + } + } catch (Exception ex) { + LOG.error("Cannot create data directory {}", dataDirPath, ex); + return false; + } + return true; + } + + /** + * Read all lines from a file and deserialize into the desired record type. + * + * @param reader Open handle for the file. + * @param recordClass Record class to create. + * @param includeDates True if dateModified/dateCreated are serialized. + * @return List of records. + * @throws IOException + */ + private <T extends BaseRecord> List<T> getAllFile( + BufferedReader reader, Class<T> clazz, boolean includeDates) + throws IOException { + + List<T> ret = new ArrayList<T>(); + String line; + while ((line = reader.readLine()) != null) { + if (!line.startsWith("#") && line.length() > 0) { + try { + T record = newRecord(line, clazz, includeDates); + ret.add(record); + } catch (Exception ex) { + LOG.error("Cannot parse line in data source file: {}", line, ex); + } + } + } + return ret; + } + + @Override + public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) + throws IOException { + return get(clazz, (String)null); + } + + @Override + public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub) + throws IOException { + verifyDriverReady(); + BufferedReader reader = null; + lockRecordRead(clazz); + try { + reader = getReader(clazz, sub); + List<T> data = getAllFile(reader, clazz, true); + return new QueryResult<T>(data, getTime()); + } catch (Exception ex) { + LOG.error("Cannot fetch records {}", clazz.getSimpleName()); + throw new IOException("Cannot read from data store " + ex.getMessage()); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + LOG.error("Failed closing file", e); + } + } + unlockRecordRead(clazz); + } + } + + /** + * Overwrite the existing data with a new data set. + * + * @param list List of records to write. + * @param writer BufferedWriter stream to write to. + * @return If the records were succesfully written. + */ + private <T extends BaseRecord> boolean writeAllFile( + Collection<T> records, BufferedWriter writer) { + + try { + for (BaseRecord record : records) { + try { + String data = serializeString(record); + writer.write(data); + writer.newLine(); + } catch (IllegalArgumentException ex) { + LOG.error("Cannot write record {} to file", record, ex); + } + } + writer.flush(); + return true; + } catch (IOException e) { + LOG.error("Cannot commit records to file", e); + return false; + } + } + + /** + * Overwrite the existing data with a new data set. Replaces all records in + * the data store for this record class. If all records in the data store are + * not successfully committed, this function must return false and leave the + * data store unchanged. + * + * @param records List of records to write. All records must be of type + * recordClass. + * @param recordClass Class of record to replace. + * @return true if all operations were successful, false otherwise. + * @throws StateStoreUnavailableException + */ + public <T extends BaseRecord> boolean writeAll( + Collection<T> records, Class<T> recordClass) + throws StateStoreUnavailableException { + verifyDriverReady(); + lockRecordWrite(recordClass); + BufferedWriter writer = null; + try { + writer = getWriter(recordClass, null); + return writeAllFile(records, writer); + } catch (Exception e) { + LOG.error( + "Cannot add records to file for {}", recordClass.getSimpleName(), e); + return false; + } finally { + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + LOG.error( + "Cannot close writer for {}", recordClass.getSimpleName(), e); + } + } + unlockRecordWrite(recordClass); + } + } + + /** + * Get the data file name. + * + * @return Data file name. + */ + protected String getDataFileName() { + return DATA_FILE_NAME; + } + + @Override + public boolean isDriverReady() { + return this.initialized; + } + + @Override + public <T extends BaseRecord> boolean putAll( + List<T> records, boolean allowUpdate, boolean errorIfExists) + throws StateStoreUnavailableException { + verifyDriverReady(); + + if (records.isEmpty()) { + return true; + } + + @SuppressWarnings("unchecked") + Class<T> clazz = (Class<T>) getRecordClass(records.get(0).getClass()); + QueryResult<T> result; + try { + result = get(clazz); + } catch (IOException e) { + return false; + } + Map<Object, T> writeList = new HashMap<>(); + + // Write all of the existing records + for (T existingRecord : result.getRecords()) { + String key = existingRecord.getPrimaryKey(); + writeList.put(key, existingRecord); + } + + // Add inserts and updates, overwrite any existing values + for (T updatedRecord : records) { + try { + updatedRecord.validate(); + String key = updatedRecord.getPrimaryKey(); + if (writeList.containsKey(key) && allowUpdate) { + // Update + writeList.put(key, updatedRecord); + // Update the mod time stamp. Many backends will use their + // own timestamp for the mod time. + updatedRecord.setDateModified(this.getTime()); + } else if (!writeList.containsKey(key)) { + // Insert + // Create/Mod timestamps are already initialized + writeList.put(key, updatedRecord); + } else if (errorIfExists) { + LOG.error("Attempt to insert record {} that already exists", + updatedRecord); + return false; + } + } catch (IllegalArgumentException ex) { + LOG.error("Cannot write invalid record to State Store", ex); + return false; + } + } + + // Write all + boolean status = writeAll(writeList.values(), clazz); + return status; + } + + @Override + public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query) + throws StateStoreUnavailableException { + verifyDriverReady(); + + if (query == null) { + return 0; + } + + int removed = 0; + // Get the current records + try { + final QueryResult<T> result = get(clazz); + final List<T> existingRecords = result.getRecords(); + // Write all of the existing records except those to be removed + final List<T> recordsToRemove = filterMultiple(query, existingRecords); + removed = recordsToRemove.size(); + final List<T> newRecords = new LinkedList<>(); + for (T record : existingRecords) { + if (!recordsToRemove.contains(record)) { + newRecords.add(record); + } + } + if (!writeAll(newRecords, clazz)) { + throw new IOException( + "Cannot remove record " + clazz + " query " + query); + } + } catch (IOException e) { + LOG.error("Cannot remove records {} query {}", clazz, query, e); + } + + return removed; + } + + @Override + public <T extends BaseRecord> boolean removeAll(Class<T> clazz) + throws StateStoreUnavailableException { + verifyDriverReady(); + List<T> emptyList = new ArrayList<>(); + boolean status = writeAll(emptyList, clazz); + return status; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee748132/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java new file mode 100644 index 0000000..24e9660 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.io.Files; + +/** + * StateStoreDriver implementation based on a local file. + */ +public class StateStoreFileImpl extends StateStoreFileBaseImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreFileImpl.class); + + /** Configuration keys. */ + public static final String FEDERATION_STORE_FILE_DIRECTORY = + DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.file.directory"; + + /** Synchronization. */ + private static final ReadWriteLock READ_WRITE_LOCK = + new ReentrantReadWriteLock(); + + /** Root directory for the state store. */ + private String rootDirectory; + + + @Override + protected boolean exists(String path) { + File test = new File(path); + return test.exists(); + } + + @Override + protected boolean mkdir(String path) { + File dir = new File(path); + return dir.mkdirs(); + } + + @Override + protected String getRootDir() { + if (this.rootDirectory == null) { + String dir = getConf().get(FEDERATION_STORE_FILE_DIRECTORY); + if (dir == null) { + File tempDir = Files.createTempDir(); + dir = tempDir.getAbsolutePath(); + } + this.rootDirectory = dir; + } + return this.rootDirectory; + } + + @Override + protected <T extends BaseRecord> void lockRecordWrite(Class<T> recordClass) { + // TODO - Synchronize via FS + READ_WRITE_LOCK.writeLock().lock(); + } + + @Override + protected <T extends BaseRecord> void unlockRecordWrite( + Class<T> recordClass) { + // TODO - Synchronize via FS + READ_WRITE_LOCK.writeLock().unlock(); + } + + @Override + protected <T extends BaseRecord> void lockRecordRead(Class<T> recordClass) { + // TODO - Synchronize via FS + READ_WRITE_LOCK.readLock().lock(); + } + + @Override + protected <T extends BaseRecord> void unlockRecordRead(Class<T> recordClass) { + // TODO - Synchronize via FS + READ_WRITE_LOCK.readLock().unlock(); + } + + @Override + protected <T extends BaseRecord> BufferedReader getReader( + Class<T> clazz, String sub) { + String filename = StateStoreUtils.getRecordName(clazz); + if (sub != null && sub.length() > 0) { + filename += "/" + sub; + } + filename += "/" + getDataFileName(); + + try { + LOG.debug("Loading file: {}", filename); + File file = new File(getRootDir(), filename); + FileInputStream fis = new FileInputStream(file); + InputStreamReader isr = + new InputStreamReader(fis, StandardCharsets.UTF_8); + BufferedReader reader = new BufferedReader(isr); + return reader; + } catch (Exception ex) { + LOG.error( + "Cannot open read stream for record {}", clazz.getSimpleName(), ex); + return null; + } + } + + @Override + protected <T extends BaseRecord> BufferedWriter getWriter( + Class<T> clazz, String sub) { + String filename = StateStoreUtils.getRecordName(clazz); + if (sub != null && sub.length() > 0) { + filename += "/" + sub; + } + filename += "/" + getDataFileName(); + + try { + File file = new File(getRootDir(), filename); + FileOutputStream fos = new FileOutputStream(file, false); + OutputStreamWriter osw = + new OutputStreamWriter(fos, StandardCharsets.UTF_8); + BufferedWriter writer = new BufferedWriter(osw); + return writer; + } catch (IOException ex) { + LOG.error( + "Cannot open read stream for record {}", clazz.getSimpleName(), ex); + return null; + } + } + + @Override + public void close() throws Exception { + setInitialized(false); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee748132/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java new file mode 100644 index 0000000..5968421 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.net.URI; +import java.nio.charset.StandardCharsets; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * StateStoreDriver} implementation based on a filesystem. The most common uses + * HDFS as a backend. + */ +public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreFileSystemImpl.class); + + + /** Configuration keys. */ + public static final String FEDERATION_STORE_FS_PATH = + DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.fs.path"; + + /** File system to back the State Store. */ + private FileSystem fs; + /** Working path in the filesystem. */ + private String workPath; + + @Override + protected boolean exists(String path) { + try { + return fs.exists(new Path(path)); + } catch (IOException e) { + return false; + } + } + + @Override + protected boolean mkdir(String path) { + try { + return fs.mkdirs(new Path(path)); + } catch (IOException e) { + return false; + } + } + + @Override + protected String getRootDir() { + if (this.workPath == null) { + String rootPath = getConf().get(FEDERATION_STORE_FS_PATH); + URI workUri; + try { + workUri = new URI(rootPath); + fs = FileSystem.get(workUri, getConf()); + } catch (Exception ex) { + return null; + } + this.workPath = rootPath; + } + return this.workPath; + } + + @Override + public void close() throws Exception { + if (fs != null) { + fs.close(); + } + } + + /** + * Get the folder path for the record class' data. + * + * @param cls Data record class. + * @return Path of the folder containing the record class' data files. + */ + private Path getPathForClass(Class<? extends BaseRecord> clazz) { + if (clazz == null) { + return null; + } + // TODO extract table name from class: entry.getTableName() + String className = StateStoreUtils.getRecordName(clazz); + return new Path(workPath, className); + } + + @Override + protected <T extends BaseRecord> void lockRecordRead(Class<T> clazz) { + // Not required, synced with HDFS leasing + } + + @Override + protected <T extends BaseRecord> void unlockRecordRead(Class<T> clazz) { + // Not required, synced with HDFS leasing + } + + @Override + protected <T extends BaseRecord> void lockRecordWrite(Class<T> clazz) { + // TODO -> wait for lease to be available + } + + @Override + protected <T extends BaseRecord> void unlockRecordWrite(Class<T> clazz) { + // TODO -> ensure lease is closed for the file + } + + @Override + protected <T extends BaseRecord> BufferedReader getReader( + Class<T> clazz, String sub) { + + Path path = getPathForClass(clazz); + if (sub != null && sub.length() > 0) { + path = Path.mergePaths(path, new Path("/" + sub)); + } + path = Path.mergePaths(path, new Path("/" + getDataFileName())); + + try { + FSDataInputStream fdis = fs.open(path); + InputStreamReader isr = + new InputStreamReader(fdis, StandardCharsets.UTF_8); + BufferedReader reader = new BufferedReader(isr); + return reader; + } catch (IOException ex) { + LOG.error("Cannot open write stream for {} to {}", + clazz.getSimpleName(), path); + return null; + } + } + + @Override + protected <T extends BaseRecord> BufferedWriter getWriter( + Class<T> clazz, String sub) { + + Path path = getPathForClass(clazz); + if (sub != null && sub.length() > 0) { + path = Path.mergePaths(path, new Path("/" + sub)); + } + path = Path.mergePaths(path, new Path("/" + getDataFileName())); + + try { + FSDataOutputStream fdos = fs.create(path, true); + OutputStreamWriter osw = + new OutputStreamWriter(fdos, StandardCharsets.UTF_8); + BufferedWriter writer = new BufferedWriter(osw); + return writer; + } catch (IOException ex) { + LOG.error("Cannot open write stream for {} to {}", + clazz.getSimpleName(), path); + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee748132/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java new file mode 100644 index 0000000..e9b3fdf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; + +/** + * State Store driver that stores a serialization of the records. The serializer + * is pluggable. + */ +public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl { + + /** Default serializer for this driver. */ + private StateStoreSerializer serializer; + + + @Override + public boolean init(final Configuration config, final String id, + final Collection<Class<? extends BaseRecord>> records) { + boolean ret = super.init(config, id, records); + + this.serializer = StateStoreSerializer.getSerializer(config); + + return ret; + } + + /** + * Serialize a record using the serializer. + * @param record Record to serialize. + * @return Byte array with the serialization of the record. + */ + protected <T extends BaseRecord> byte[] serialize(T record) { + return serializer.serialize(record); + } + + /** + * Serialize a record using the serializer. + * @param record Record to serialize. + * @return String with the serialization of the record. + */ + protected <T extends BaseRecord> String serializeString(T record) { + return serializer.serializeString(record); + } + + /** + * Creates a record from an input data string. + * @param data Serialized text of the record. + * @param clazz Record class. + * @param includeDates If dateModified and dateCreated are serialized. + * @return The created record. + * @throws IOException + */ + protected <T extends BaseRecord> T newRecord( + String data, Class<T> clazz, boolean includeDates) throws IOException { + return serializer.deserialize(data, clazz); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee748132/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java index 4192a3d..79f99c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java @@ -123,6 +123,24 @@ public abstract class BaseRecord implements Comparable<BaseRecord> { } /** + * Check if this record matches a partial record. + * + * @param other Partial record. + * @return If this record matches. + */ + public boolean like(BaseRecord other) { + if (other == null) { + return false; + } + Map<String, String> thisKeys = this.getPrimaryKeys(); + Map<String, String> otherKeys = other.getPrimaryKeys(); + if (thisKeys == null) { + return otherKeys == null; + } + return thisKeys.equals(otherKeys); + } + + /** * Override equals check to use primary key(s) for comparison. */ @Override @@ -186,4 +204,4 @@ public abstract class BaseRecord implements Comparable<BaseRecord> { public String toString() { return getPrimaryKey(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee748132/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java new file mode 100644 index 0000000..3c59abf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records; + +/** + * Check if a record matches a query. The query is usually a partial record. + * + * @param <T> Type of the record to query. + */ +public class Query<T extends BaseRecord> { + + /** Partial object to compare against. */ + private final T partial; + + + /** + * Create a query to search for a partial record. + * + * @param partial It defines the attributes to search. + */ + public Query(final T part) { + this.partial = part; + } + + /** + * Get the partial record used to query. + * + * @return The partial record used for the query. + */ + public T getPartial() { + return this.partial; + } + + /** + * Check if a record matches the primary keys or the partial record. + * + * @param other Record to check. + * @return If the record matches. Don't match if there is no partial. + */ + public boolean matches(T other) { + if (this.partial == null) { + return false; + } + return this.partial.like(other); + } + + @Override + public String toString() { + return "Checking: " + this.partial; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee748132/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index f1dce6b..a8410b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4643,4 +4643,20 @@ </description> </property> + <property> + <name>dfs.federation.router.store.driver.class</name> + <value>org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl</value> + <description> + Class to implement the State Store. By default it uses the local disk. + </description> + </property> + + <property> + <name>dfs.federation.router.store.connection.test</name> + <value>60000</value> + <description> + How often to check for the connection to the State Store in milliseconds. + </description> + </property> + </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee748132/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java new file mode 100644 index 0000000..fc5aebd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS; +import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl.FEDERATION_STORE_FILE_DIRECTORY; +import static org.junit.Assert.assertNotNull; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.util.Time; + +/** + * Utilities to test the State Store. + */ +public final class FederationStateStoreTestUtils { + + private FederationStateStoreTestUtils() { + // Utility Class + } + + /** + * Get the default State Store driver implementation. + * + * @return Class of the default State Store driver implementation. + */ + public static Class<? extends StateStoreDriver> getDefaultDriver() { + return DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT; + } + + /** + * Create a default State Store configuration. + * + * @return State Store configuration. + */ + public static Configuration getStateStoreConfiguration() { + Class<? extends StateStoreDriver> clazz = getDefaultDriver(); + return getStateStoreConfiguration(clazz); + } + + /** + * Create a new State Store configuration for a particular driver. + * + * @param clazz Class of the driver to create. + * @return State Store configuration. + */ + public static Configuration getStateStoreConfiguration( + Class<? extends StateStoreDriver> clazz) { + Configuration conf = new HdfsConfiguration(false); + + conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs://test"); + + conf.setClass(FEDERATION_STORE_DRIVER_CLASS, clazz, StateStoreDriver.class); + + if (clazz.isAssignableFrom(StateStoreFileBaseImpl.class)) { + setFileConfiguration(conf); + } + return conf; + } + + /** + * Create a new State Store based on a configuration. + * + * @param configuration Configuration for the State Store. + * @return New State Store service. + * @throws IOException If it cannot create the State Store. + * @throws InterruptedException If we cannot wait for the store to start. + */ + public static StateStoreService getStateStore( + Configuration configuration) throws IOException, InterruptedException { + + StateStoreService stateStore = new StateStoreService(); + assertNotNull(stateStore); + + // Set unique identifier, this is normally the router address + String identifier = UUID.randomUUID().toString(); + stateStore.setIdentifier(identifier); + + stateStore.init(configuration); + stateStore.start(); + + // Wait for state store to connect + waitStateStore(stateStore, TimeUnit.SECONDS.toMillis(10)); + + return stateStore; + } + + /** + * Wait for the State Store to initialize its driver. + * + * @param stateStore State Store. + * @param timeoutMs Time out in milliseconds. + * @throws IOException If the State Store cannot be reached. + * @throws InterruptedException If the sleep is interrupted. + */ + public static void waitStateStore(StateStoreService stateStore, + long timeoutMs) throws IOException, InterruptedException { + long startingTime = Time.monotonicNow(); + while (!stateStore.isDriverReady()) { + Thread.sleep(100); + if (Time.monotonicNow() - startingTime > timeoutMs) { + throw new IOException("Timeout waiting for State Store to connect"); + } + } + } + + /** + * Delete the default State Store. + * + * @throws IOException + */ + public static void deleteStateStore() throws IOException { + Class<? extends StateStoreDriver> driverClass = getDefaultDriver(); + deleteStateStore(driverClass); + } + + /** + * Delete the State Store. + * @param driverClass Class of the State Store driver implementation. + * @throws IOException If it cannot be deleted. + */ + public static void deleteStateStore( + Class<? extends StateStoreDriver> driverClass) throws IOException { + + if (StateStoreFileBaseImpl.class.isAssignableFrom(driverClass)) { + String workingDirectory = System.getProperty("user.dir"); + File dir = new File(workingDirectory + "/statestore"); + if (dir.exists()) { + FileUtils.cleanDirectory(dir); + } + } + } + + /** + * Set the default configuration for drivers based on files. + * + * @param conf Configuration to extend. + */ + public static void setFileConfiguration(Configuration conf) { + String workingPath = System.getProperty("user.dir"); + String stateStorePath = workingPath + "/statestore"; + conf.set(FEDERATION_STORE_FILE_DIRECTORY, stateStorePath); + } + + /** + * Clear all the records from the State Store. + * + * @param store State Store to remove records from. + * @return If the State Store was cleared. + * @throws IOException If it cannot clear the State Store. + */ + public static boolean clearAllRecords(StateStoreService store) + throws IOException { + Collection<Class<? extends BaseRecord>> allRecords = + store.getSupportedRecords(); + for (Class<? extends BaseRecord> recordType : allRecords) { + if (!clearRecords(store, recordType)) { + return false; + } + } + return true; + } + + /** + * Clear records from a certain type from the State Store. + * + * @param store State Store to remove records from. + * @param recordClass Class of the records to remove. + * @return If the State Store was cleared. + * @throws IOException If it cannot clear the State Store. + */ + public static <T extends BaseRecord> boolean clearRecords( + StateStoreService store, Class<T> recordClass) throws IOException { + List<T> emptyList = new ArrayList<>(); + if (!synchronizeRecords(store, emptyList, recordClass)) { + return false; + } + return true; + } + + /** + * Synchronize a set of records. Remove all and keep the ones specified. + * + * @param stateStore State Store service managing the driver. + * @param records Records to add. + * @param clazz Class of the record to synchronize. + * @return If the synchronization succeeded. + * @throws IOException If it cannot connect to the State Store. + */ + public static <T extends BaseRecord> boolean synchronizeRecords( + StateStoreService stateStore, List<T> records, Class<T> clazz) + throws IOException { + StateStoreDriver driver = stateStore.getDriver(); + driver.verifyDriverReady(); + if (driver.removeAll(clazz)) { + if (driver.putAll(records, true, false)) { + return true; + } + } + return false; + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org