baohe-zhang commented on a change in pull request #28412: URL: https://github.com/apache/spark/pull/28412#discussion_r436796185
########## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ########## @@ -1197,6 +1213,71 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) KVUtils.open(newStorePath, metadata) } + private def createHybridStore( + dm: HistoryServerDiskManager, + appId: String, + attempt: AttemptInfoWrapper, + metadata: AppStatusStoreMetadata): KVStore = { + + val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), + attempt.lastIndex) + val isCompressed = reader.compressionCodec.isDefined + + val memoryUsage = approximateMemoryUsage(reader.totalSize, isCompressed) + if (currentInMemoryStoreUsage.get + memoryUsage > maxInMemoryStoreUsage) { + throw new IllegalStateException("Not enough in-memory storage to create hybrid store.") + } + currentInMemoryStoreUsage.addAndGet(memoryUsage) + logInfo(s"Attempt creating hybrid store to parse $appId / ${attempt.info.attemptId}. " + + s"Requested ${Utils.bytesToString(memoryUsage)} in-memory storage quota.") + + logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") + val lease = dm.lease(reader.totalSize, isCompressed) + val isLeaseRolledBack = new java.util.concurrent.atomic.AtomicBoolean(false) + var store: HybridStore = null + try { + store = new HybridStore() + val levelDB = KVUtils.open(lease.tmpPath, metadata) + store.setLevelDB(levelDB) + store.setCachedQuantileKlass(classOf[CachedQuantile]) Review comment: ok, will do that. I am moving it to org.apache.spark.deploy.history module. ########## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ########## @@ -1197,6 +1213,71 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) KVUtils.open(newStorePath, metadata) } + private def createHybridStore( + dm: HistoryServerDiskManager, + appId: String, + attempt: AttemptInfoWrapper, + metadata: AppStatusStoreMetadata): KVStore = { + + val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), + attempt.lastIndex) + val isCompressed = reader.compressionCodec.isDefined + + val memoryUsage = approximateMemoryUsage(reader.totalSize, isCompressed) + if (currentInMemoryStoreUsage.get + memoryUsage > maxInMemoryStoreUsage) { + throw new IllegalStateException("Not enough in-memory storage to create hybrid store.") + } + currentInMemoryStoreUsage.addAndGet(memoryUsage) + logInfo(s"Attempt creating hybrid store to parse $appId / ${attempt.info.attemptId}. " + + s"Requested ${Utils.bytesToString(memoryUsage)} in-memory storage quota.") + + logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") + val lease = dm.lease(reader.totalSize, isCompressed) + val isLeaseRolledBack = new java.util.concurrent.atomic.AtomicBoolean(false) + var store: HybridStore = null + try { + store = new HybridStore() + val levelDB = KVUtils.open(lease.tmpPath, metadata) + store.setLevelDB(levelDB) + store.setCachedQuantileKlass(classOf[CachedQuantile]) + rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime()) + + // Start the background thread to dump data to levelDB when writing to + // InMemoryStore is completed. + store.switchingToLevelDB(new HybridStore.SwitchingToLevelDBListener { + override def onSwitchingToLevelDBSuccess: Unit = { + levelDB.close() + val newStorePath = lease.commit(appId, attempt.info.attemptId) + store.setLevelDB(KVUtils.open(newStorePath, metadata)) + currentInMemoryStoreUsage.addAndGet(-memoryUsage) + logInfo(s"Completely switched to leveldb for app" + + s" $appId / ${attempt.info.attemptId}. " + + s"Released ${Utils.bytesToString(memoryUsage)} in-memory storage quota.") + } + + override def onSwitchingToLevelDBFail(e: Exception): Unit = { + logWarning(s"Failed to switch to leveldb for app" + + s" $appId / ${attempt.info.attemptId}", e) + levelDB.close() + if (!isLeaseRolledBack.getAndSet(true)) { + lease.rollback() + } + } + }) + + store + } catch { + case e: Exception => + store.close() + currentInMemoryStoreUsage.addAndGet(-memoryUsage) Review comment: decrementAndGet() only decrements one, there is no method for decrementAndGet(long n) ########## File path: core/src/main/scala/org/apache/spark/internal/config/History.scala ########## @@ -195,4 +195,18 @@ private[spark] object History { .version("3.0.0") .booleanConf .createWithDefault(true) + + val HYBRID_STORE_ENABLED = ConfigBuilder("spark.history.store.hybridStore.enabled") + .doc("Whether to use HybridStore as the store when parsing event logs. " + + "HybridStore will first write data to an in-memory store and having a background thread " + + "that dumps data to a disk store after the writing to in-memory store is completed. " + + "Use it with caution, as in-memory store requires higher memory usage.") Review comment: addressed. removed caution. ########## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import org.apache.spark.annotation.Private; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Collection; + +/** + * Implementation of KVStore that writes data to InMemoryStore at first and uses + * a background thread to dump data to LevelDB once the writing to InMemoryStore + * is completed. + */ +@Private +public class HybridStore implements KVStore { + + private InMemoryStore inMemoryStore = new InMemoryStore(); + private LevelDB levelDB = null; + + // Flag to indicate if we should use inMemoryStore Or levelDB. + private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true); + + // A background thread that dumps data in inMemoryStore to levelDB + private Thread backgroundThread = null; + + // A hash map that stores all class types (except CachedQuantile) that had been writen + // to inMemoryStore. + private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>(); + + // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need + // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile + // objects when the underlying store is inMemoryStore, and dump these objects to levelDB + // before the switch completes. + private Class<?> cachedQuantileKlass = null; + private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>(); + + + @Override + public <T> T getMetadata(Class<T> klass) throws Exception { + KVStore store = getStore(); Review comment: addressed. ########## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import org.apache.spark.annotation.Private; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Collection; + +/** + * Implementation of KVStore that writes data to InMemoryStore at first and uses + * a background thread to dump data to LevelDB once the writing to InMemoryStore + * is completed. + */ +@Private +public class HybridStore implements KVStore { + + private InMemoryStore inMemoryStore = new InMemoryStore(); + private LevelDB levelDB = null; + + // Flag to indicate if we should use inMemoryStore Or levelDB. + private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true); + + // A background thread that dumps data in inMemoryStore to levelDB + private Thread backgroundThread = null; + + // A hash map that stores all class types (except CachedQuantile) that had been writen + // to inMemoryStore. + private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>(); + + // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need + // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile + // objects when the underlying store is inMemoryStore, and dump these objects to levelDB + // before the switch completes. + private Class<?> cachedQuantileKlass = null; + private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>(); + + + @Override + public <T> T getMetadata(Class<T> klass) throws Exception { + KVStore store = getStore(); + T metaData = store.getMetadata(klass); + return metaData; + } + + @Override + public void setMetadata(Object value) throws Exception { + KVStore store = getStore(); Review comment: addressed ########## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import org.apache.spark.annotation.Private; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Collection; + +/** + * Implementation of KVStore that writes data to InMemoryStore at first and uses + * a background thread to dump data to LevelDB once the writing to InMemoryStore + * is completed. + */ +@Private +public class HybridStore implements KVStore { + + private InMemoryStore inMemoryStore = new InMemoryStore(); + private LevelDB levelDB = null; + + // Flag to indicate if we should use inMemoryStore Or levelDB. + private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true); + + // A background thread that dumps data in inMemoryStore to levelDB + private Thread backgroundThread = null; + + // A hash map that stores all class types (except CachedQuantile) that had been writen + // to inMemoryStore. + private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>(); + + // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need + // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile + // objects when the underlying store is inMemoryStore, and dump these objects to levelDB + // before the switch completes. + private Class<?> cachedQuantileKlass = null; + private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>(); + + + @Override + public <T> T getMetadata(Class<T> klass) throws Exception { + KVStore store = getStore(); + T metaData = store.getMetadata(klass); + return metaData; + } + + @Override + public void setMetadata(Object value) throws Exception { + KVStore store = getStore(); + store.setMetadata(value); + } + + @Override + public <T> T read(Class<T> klass, Object naturalKey) throws Exception { + KVStore store = getStore(); Review comment: addressed ########## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import org.apache.spark.annotation.Private; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Collection; + +/** + * Implementation of KVStore that writes data to InMemoryStore at first and uses + * a background thread to dump data to LevelDB once the writing to InMemoryStore + * is completed. + */ +@Private +public class HybridStore implements KVStore { + + private InMemoryStore inMemoryStore = new InMemoryStore(); + private LevelDB levelDB = null; + + // Flag to indicate if we should use inMemoryStore Or levelDB. + private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true); + + // A background thread that dumps data in inMemoryStore to levelDB + private Thread backgroundThread = null; + + // A hash map that stores all class types (except CachedQuantile) that had been writen + // to inMemoryStore. + private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>(); + + // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need + // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile + // objects when the underlying store is inMemoryStore, and dump these objects to levelDB + // before the switch completes. + private Class<?> cachedQuantileKlass = null; + private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>(); + + + @Override + public <T> T getMetadata(Class<T> klass) throws Exception { + KVStore store = getStore(); + T metaData = store.getMetadata(klass); + return metaData; + } + + @Override + public void setMetadata(Object value) throws Exception { + KVStore store = getStore(); + store.setMetadata(value); + } + + @Override + public <T> T read(Class<T> klass, Object naturalKey) throws Exception { + KVStore store = getStore(); + T value = store.read(klass, naturalKey); + return value; + } + + @Override + public void write(Object value) throws Exception { + Class<?> klass = value.getClass(); + + if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) { + throw new RuntimeException("write() for objects other than CachedQuantile " + + "shouldn't be called after the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); Review comment: addressed ########## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import org.apache.spark.annotation.Private; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Collection; + +/** + * Implementation of KVStore that writes data to InMemoryStore at first and uses + * a background thread to dump data to LevelDB once the writing to InMemoryStore + * is completed. + */ +@Private +public class HybridStore implements KVStore { + + private InMemoryStore inMemoryStore = new InMemoryStore(); + private LevelDB levelDB = null; + + // Flag to indicate if we should use inMemoryStore Or levelDB. + private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true); + + // A background thread that dumps data in inMemoryStore to levelDB + private Thread backgroundThread = null; + + // A hash map that stores all class types (except CachedQuantile) that had been writen + // to inMemoryStore. + private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>(); + + // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need + // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile + // objects when the underlying store is inMemoryStore, and dump these objects to levelDB + // before the switch completes. + private Class<?> cachedQuantileKlass = null; + private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>(); + + + @Override + public <T> T getMetadata(Class<T> klass) throws Exception { + KVStore store = getStore(); + T metaData = store.getMetadata(klass); + return metaData; + } + + @Override + public void setMetadata(Object value) throws Exception { + KVStore store = getStore(); + store.setMetadata(value); + } + + @Override + public <T> T read(Class<T> klass, Object naturalKey) throws Exception { + KVStore store = getStore(); + T value = store.read(klass, naturalKey); + return value; + } + + @Override + public void write(Object value) throws Exception { + Class<?> klass = value.getClass(); + + if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) { + throw new RuntimeException("write() for objects other than CachedQuantile " + + "shouldn't be called after the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); + store.write(value); + + if (store instanceof InMemoryStore) { + if (klass.equals(cachedQuantileKlass)) { + cachedQuantileQueue.add(value); + } else { + klassMap.putIfAbsent(klass, true); + } + } + } + + @Override + public void delete(Class<?> type, Object naturalKey) throws Exception { + if (backgroundThread != null) { + throw new RuntimeException("delete() shouldn't be called after " + + "the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); Review comment: addressed ########## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import org.apache.spark.annotation.Private; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Collection; + +/** + * Implementation of KVStore that writes data to InMemoryStore at first and uses + * a background thread to dump data to LevelDB once the writing to InMemoryStore + * is completed. + */ +@Private +public class HybridStore implements KVStore { + + private InMemoryStore inMemoryStore = new InMemoryStore(); + private LevelDB levelDB = null; + + // Flag to indicate if we should use inMemoryStore Or levelDB. + private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true); + + // A background thread that dumps data in inMemoryStore to levelDB + private Thread backgroundThread = null; + + // A hash map that stores all class types (except CachedQuantile) that had been writen + // to inMemoryStore. + private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>(); + + // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need + // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile + // objects when the underlying store is inMemoryStore, and dump these objects to levelDB + // before the switch completes. + private Class<?> cachedQuantileKlass = null; + private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>(); + + + @Override + public <T> T getMetadata(Class<T> klass) throws Exception { + KVStore store = getStore(); + T metaData = store.getMetadata(klass); + return metaData; + } + + @Override + public void setMetadata(Object value) throws Exception { + KVStore store = getStore(); + store.setMetadata(value); + } + + @Override + public <T> T read(Class<T> klass, Object naturalKey) throws Exception { + KVStore store = getStore(); + T value = store.read(klass, naturalKey); + return value; + } + + @Override + public void write(Object value) throws Exception { + Class<?> klass = value.getClass(); + + if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) { + throw new RuntimeException("write() for objects other than CachedQuantile " + + "shouldn't be called after the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); + store.write(value); + + if (store instanceof InMemoryStore) { + if (klass.equals(cachedQuantileKlass)) { + cachedQuantileQueue.add(value); + } else { + klassMap.putIfAbsent(klass, true); + } + } + } + + @Override + public void delete(Class<?> type, Object naturalKey) throws Exception { + if (backgroundThread != null) { + throw new RuntimeException("delete() shouldn't be called after " + + "the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); + store.delete(type, naturalKey); + } + + @Override + public <T> KVStoreView<T> view(Class<T> type) throws Exception { + KVStore store = getStore(); Review comment: addressed ########## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import org.apache.spark.annotation.Private; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Collection; + +/** + * Implementation of KVStore that writes data to InMemoryStore at first and uses + * a background thread to dump data to LevelDB once the writing to InMemoryStore + * is completed. + */ +@Private +public class HybridStore implements KVStore { + + private InMemoryStore inMemoryStore = new InMemoryStore(); + private LevelDB levelDB = null; + + // Flag to indicate if we should use inMemoryStore Or levelDB. + private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true); + + // A background thread that dumps data in inMemoryStore to levelDB + private Thread backgroundThread = null; + + // A hash map that stores all class types (except CachedQuantile) that had been writen + // to inMemoryStore. + private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>(); + + // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need + // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile + // objects when the underlying store is inMemoryStore, and dump these objects to levelDB + // before the switch completes. + private Class<?> cachedQuantileKlass = null; + private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>(); + + + @Override + public <T> T getMetadata(Class<T> klass) throws Exception { + KVStore store = getStore(); + T metaData = store.getMetadata(klass); + return metaData; + } + + @Override + public void setMetadata(Object value) throws Exception { + KVStore store = getStore(); + store.setMetadata(value); + } + + @Override + public <T> T read(Class<T> klass, Object naturalKey) throws Exception { + KVStore store = getStore(); + T value = store.read(klass, naturalKey); + return value; + } + + @Override + public void write(Object value) throws Exception { + Class<?> klass = value.getClass(); + + if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) { + throw new RuntimeException("write() for objects other than CachedQuantile " + + "shouldn't be called after the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); + store.write(value); + + if (store instanceof InMemoryStore) { + if (klass.equals(cachedQuantileKlass)) { + cachedQuantileQueue.add(value); + } else { + klassMap.putIfAbsent(klass, true); + } + } + } + + @Override + public void delete(Class<?> type, Object naturalKey) throws Exception { + if (backgroundThread != null) { + throw new RuntimeException("delete() shouldn't be called after " + + "the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); + store.delete(type, naturalKey); + } + + @Override + public <T> KVStoreView<T> view(Class<T> type) throws Exception { + KVStore store = getStore(); + KVStoreView<T> view = store.view(type); + return view; + } + + @Override + public long count(Class<?> type) throws Exception { + KVStore store = getStore(); Review comment: addressed ########## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import org.apache.spark.annotation.Private; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Collection; + +/** + * Implementation of KVStore that writes data to InMemoryStore at first and uses + * a background thread to dump data to LevelDB once the writing to InMemoryStore + * is completed. + */ +@Private +public class HybridStore implements KVStore { + + private InMemoryStore inMemoryStore = new InMemoryStore(); + private LevelDB levelDB = null; + + // Flag to indicate if we should use inMemoryStore Or levelDB. + private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true); + + // A background thread that dumps data in inMemoryStore to levelDB + private Thread backgroundThread = null; + + // A hash map that stores all class types (except CachedQuantile) that had been writen + // to inMemoryStore. + private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>(); + + // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need + // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile + // objects when the underlying store is inMemoryStore, and dump these objects to levelDB + // before the switch completes. + private Class<?> cachedQuantileKlass = null; + private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>(); + + + @Override + public <T> T getMetadata(Class<T> klass) throws Exception { + KVStore store = getStore(); + T metaData = store.getMetadata(klass); + return metaData; + } + + @Override + public void setMetadata(Object value) throws Exception { + KVStore store = getStore(); + store.setMetadata(value); + } + + @Override + public <T> T read(Class<T> klass, Object naturalKey) throws Exception { + KVStore store = getStore(); + T value = store.read(klass, naturalKey); + return value; + } + + @Override + public void write(Object value) throws Exception { + Class<?> klass = value.getClass(); + + if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) { + throw new RuntimeException("write() for objects other than CachedQuantile " + + "shouldn't be called after the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); + store.write(value); + + if (store instanceof InMemoryStore) { + if (klass.equals(cachedQuantileKlass)) { + cachedQuantileQueue.add(value); + } else { + klassMap.putIfAbsent(klass, true); + } + } + } + + @Override + public void delete(Class<?> type, Object naturalKey) throws Exception { + if (backgroundThread != null) { + throw new RuntimeException("delete() shouldn't be called after " + + "the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); + store.delete(type, naturalKey); + } + + @Override + public <T> KVStoreView<T> view(Class<T> type) throws Exception { + KVStore store = getStore(); + KVStoreView<T> view = store.view(type); + return view; + } + + @Override + public long count(Class<?> type) throws Exception { + KVStore store = getStore(); + long count = store.count(type); + return count; + } + + @Override + public long count(Class<?> type, String index, Object indexedValue) throws Exception { + KVStore store = getStore(); + long count = store.count(type, index, indexedValue); + return count; + } + + @Override + public void close() throws IOException { + try { + if (backgroundThread != null && backgroundThread.isAlive()) { + // The background thread is still running, wait for it to finish + backgroundThread.join(); + } + if (levelDB != null) { + levelDB.close(); + } + if (inMemoryStore != null) { + inMemoryStore.close(); + } + } catch (Exception e) { + if (e instanceof IOException) { + throw (IOException) e; + } + } + } + + @Override + public <T> boolean removeAllByIndexValues( + Class<T> klass, + String index, + Collection<?> indexValues) throws Exception { + if (backgroundThread != null) { + throw new RuntimeException("removeAllByIndexValues() shouldn't be called after " + + "the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); Review comment: addressed ########## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import org.apache.spark.annotation.Private; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Collection; + +/** + * Implementation of KVStore that writes data to InMemoryStore at first and uses + * a background thread to dump data to LevelDB once the writing to InMemoryStore + * is completed. + */ +@Private +public class HybridStore implements KVStore { + + private InMemoryStore inMemoryStore = new InMemoryStore(); + private LevelDB levelDB = null; + + // Flag to indicate if we should use inMemoryStore Or levelDB. + private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true); + + // A background thread that dumps data in inMemoryStore to levelDB + private Thread backgroundThread = null; + + // A hash map that stores all class types (except CachedQuantile) that had been writen + // to inMemoryStore. + private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>(); + + // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need + // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile + // objects when the underlying store is inMemoryStore, and dump these objects to levelDB + // before the switch completes. + private Class<?> cachedQuantileKlass = null; + private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>(); + + + @Override + public <T> T getMetadata(Class<T> klass) throws Exception { + KVStore store = getStore(); + T metaData = store.getMetadata(klass); + return metaData; + } + + @Override + public void setMetadata(Object value) throws Exception { + KVStore store = getStore(); + store.setMetadata(value); + } + + @Override + public <T> T read(Class<T> klass, Object naturalKey) throws Exception { + KVStore store = getStore(); + T value = store.read(klass, naturalKey); + return value; + } + + @Override + public void write(Object value) throws Exception { + Class<?> klass = value.getClass(); + + if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) { + throw new RuntimeException("write() for objects other than CachedQuantile " + + "shouldn't be called after the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); + store.write(value); + + if (store instanceof InMemoryStore) { + if (klass.equals(cachedQuantileKlass)) { + cachedQuantileQueue.add(value); + } else { + klassMap.putIfAbsent(klass, true); + } + } + } + + @Override + public void delete(Class<?> type, Object naturalKey) throws Exception { + if (backgroundThread != null) { + throw new RuntimeException("delete() shouldn't be called after " + + "the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); + store.delete(type, naturalKey); + } + + @Override + public <T> KVStoreView<T> view(Class<T> type) throws Exception { + KVStore store = getStore(); + KVStoreView<T> view = store.view(type); + return view; + } + + @Override + public long count(Class<?> type) throws Exception { + KVStore store = getStore(); + long count = store.count(type); + return count; + } + + @Override + public long count(Class<?> type, String index, Object indexedValue) throws Exception { + KVStore store = getStore(); + long count = store.count(type, index, indexedValue); + return count; + } + + @Override + public void close() throws IOException { + try { + if (backgroundThread != null && backgroundThread.isAlive()) { + // The background thread is still running, wait for it to finish + backgroundThread.join(); + } + if (levelDB != null) { + levelDB.close(); + } + if (inMemoryStore != null) { + inMemoryStore.close(); + } + } catch (Exception e) { + if (e instanceof IOException) { + throw (IOException) e; + } + } + } + + @Override + public <T> boolean removeAllByIndexValues( + Class<T> klass, + String index, + Collection<?> indexValues) throws Exception { + if (backgroundThread != null) { + throw new RuntimeException("removeAllByIndexValues() shouldn't be called after " + + "the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); + boolean removed = store.removeAllByIndexValues(klass, index, indexValues); + return removed; + } + + public void setLevelDB(LevelDB levelDB) { + this.levelDB = levelDB; + } + + public void setCachedQuantileKlass(Class<?> klass) { + this.cachedQuantileKlass = klass; + } + + /** + * This method is called when the writing is done for inMemoryStore. A + * background thread will be created and be started to dump data in inMemoryStore + * to levelDB. Once the dumping is completed, the underlying kvstore will be + * switched to levelDB. + */ + public void switchingToLevelDB(SwitchingToLevelDBListener listener) throws Exception { + // A background thread that dumps data to levelDB + backgroundThread = new Thread(new Runnable() { + public void run() { + Exception exceptionCaught = null; + + try { + for (Class<?> klass : klassMap.keySet()) { + KVStoreIterator<?> it = inMemoryStore.view(klass).closeableIterator(); + while (it.hasNext()) { + levelDB.write(it.next()); + } + } + } catch (Exception e) { + e.printStackTrace(); Review comment: removed ########## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import org.apache.spark.annotation.Private; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Collection; + +/** + * Implementation of KVStore that writes data to InMemoryStore at first and uses + * a background thread to dump data to LevelDB once the writing to InMemoryStore + * is completed. + */ +@Private +public class HybridStore implements KVStore { + + private InMemoryStore inMemoryStore = new InMemoryStore(); + private LevelDB levelDB = null; + + // Flag to indicate if we should use inMemoryStore Or levelDB. + private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true); + + // A background thread that dumps data in inMemoryStore to levelDB + private Thread backgroundThread = null; + + // A hash map that stores all class types (except CachedQuantile) that had been writen + // to inMemoryStore. + private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>(); + + // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need + // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile + // objects when the underlying store is inMemoryStore, and dump these objects to levelDB + // before the switch completes. + private Class<?> cachedQuantileKlass = null; + private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>(); + + + @Override + public <T> T getMetadata(Class<T> klass) throws Exception { + KVStore store = getStore(); + T metaData = store.getMetadata(klass); + return metaData; + } + + @Override + public void setMetadata(Object value) throws Exception { + KVStore store = getStore(); + store.setMetadata(value); + } + + @Override + public <T> T read(Class<T> klass, Object naturalKey) throws Exception { + KVStore store = getStore(); + T value = store.read(klass, naturalKey); + return value; + } + + @Override + public void write(Object value) throws Exception { + Class<?> klass = value.getClass(); + + if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) { + throw new RuntimeException("write() for objects other than CachedQuantile " + + "shouldn't be called after the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); + store.write(value); + + if (store instanceof InMemoryStore) { + if (klass.equals(cachedQuantileKlass)) { + cachedQuantileQueue.add(value); + } else { + klassMap.putIfAbsent(klass, true); + } + } + } + + @Override + public void delete(Class<?> type, Object naturalKey) throws Exception { + if (backgroundThread != null) { + throw new RuntimeException("delete() shouldn't be called after " + + "the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); + store.delete(type, naturalKey); + } + + @Override + public <T> KVStoreView<T> view(Class<T> type) throws Exception { + KVStore store = getStore(); + KVStoreView<T> view = store.view(type); + return view; + } + + @Override + public long count(Class<?> type) throws Exception { + KVStore store = getStore(); + long count = store.count(type); + return count; + } + + @Override + public long count(Class<?> type, String index, Object indexedValue) throws Exception { + KVStore store = getStore(); + long count = store.count(type, index, indexedValue); + return count; + } + + @Override + public void close() throws IOException { + try { + if (backgroundThread != null && backgroundThread.isAlive()) { + // The background thread is still running, wait for it to finish + backgroundThread.join(); + } + if (levelDB != null) { + levelDB.close(); + } + if (inMemoryStore != null) { + inMemoryStore.close(); + } + } catch (Exception e) { + if (e instanceof IOException) { + throw (IOException) e; + } + } + } + + @Override + public <T> boolean removeAllByIndexValues( + Class<T> klass, + String index, + Collection<?> indexValues) throws Exception { + if (backgroundThread != null) { + throw new RuntimeException("removeAllByIndexValues() shouldn't be called after " + + "the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); + boolean removed = store.removeAllByIndexValues(klass, index, indexValues); + return removed; + } + + public void setLevelDB(LevelDB levelDB) { + this.levelDB = levelDB; + } + + public void setCachedQuantileKlass(Class<?> klass) { + this.cachedQuantileKlass = klass; + } + + /** + * This method is called when the writing is done for inMemoryStore. A + * background thread will be created and be started to dump data in inMemoryStore + * to levelDB. Once the dumping is completed, the underlying kvstore will be + * switched to levelDB. + */ + public void switchingToLevelDB(SwitchingToLevelDBListener listener) throws Exception { + // A background thread that dumps data to levelDB + backgroundThread = new Thread(new Runnable() { + public void run() { + Exception exceptionCaught = null; + + try { + for (Class<?> klass : klassMap.keySet()) { + KVStoreIterator<?> it = inMemoryStore.view(klass).closeableIterator(); + while (it.hasNext()) { + levelDB.write(it.next()); + } + } + } catch (Exception e) { + e.printStackTrace(); + exceptionCaught = e; + } + + if (exceptionCaught == null) { + // Switch to levelDB and close inMemoryStore + shouldUseInMemoryStore.set(false); + + // Dump CachedQuantile objects to levelDB + try { + while(cachedQuantileQueue.size() > 0) { + levelDB.write(cachedQuantileQueue.poll()); + } + } catch (Exception e) { + e.printStackTrace(); Review comment: I changed the logic of writing CachedQuantile to leveldb. ########## File path: core/src/main/scala/org/apache/spark/internal/config/History.scala ########## @@ -195,4 +195,18 @@ private[spark] object History { .version("3.0.0") .booleanConf .createWithDefault(true) + + val HYBRID_STORE_ENABLED = ConfigBuilder("spark.history.store.hybridStore.enabled") + .doc("Whether to use HybridStore as the store when parsing event logs. " + + "HybridStore will first write data to an in-memory store and having a background thread " + + "that dumps data to a disk store after the writing to in-memory store is completed. " + + "Use it with caution, as in-memory store requires higher memory usage.") + .version("3.1.0") + .booleanConf + .createWithDefault(true) + + val MAX_IN_MEMORY_STORE_USAGE = ConfigBuilder("spark.history.store.hybridStore.maxMemoryUsage") + .version("3.1.0") Review comment: doc added. ########## File path: core/src/main/scala/org/apache/spark/internal/config/History.scala ########## @@ -195,4 +195,18 @@ private[spark] object History { .version("3.0.0") .booleanConf .createWithDefault(true) + + val HYBRID_STORE_ENABLED = ConfigBuilder("spark.history.store.hybridStore.enabled") + .doc("Whether to use HybridStore as the store when parsing event logs. " + + "HybridStore will first write data to an in-memory store and having a background thread " + + "that dumps data to a disk store after the writing to in-memory store is completed. " + + "Use it with caution, as in-memory store requires higher memory usage.") + .version("3.1.0") + .booleanConf + .createWithDefault(true) Review comment: done. ########## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ########## @@ -128,6 +129,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_)) private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING) + private val hybridStoreEnabled = conf.get(History.HYBRID_STORE_ENABLED) + private val maxInMemoryStoreUsage = conf.get(History.MAX_IN_MEMORY_STORE_USAGE) + private val currentInMemoryStoreUsage = new java.util.concurrent.atomic.AtomicLong(0L) Review comment: a new class HistoryServerMemoryManager is created. ########## File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala ########## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.IOException +import java.util.Collection +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.util.kvstore._ + +/** + * A implementation of KVStore that accelerates event logs loading. + * + * When rebuilding the application state from event logs, HybridStore will + * write data to InMemoryStore at first and use a background thread to dump + * data to LevelDB once the writing to InMemoryStore is completed. + */ + +private[history] class HybridStore extends KVStore { + + private val inMemoryStore = new InMemoryStore() + + private var levelDB: LevelDB = null + + // Flag to indicate whether we should use inMemoryStore or levelDB + private[history] val shouldUseInMemoryStore = new AtomicBoolean(true) + + // A background thread that dumps data from inMemoryStore to levelDB + private var backgroundThread: Thread = null + + // Objects of these classes will be dumped to levelDB in the background thread + private var klassList: Seq[Class[_]] = List( Review comment: I think the main purpose is to handle CachedQuantile specifically. CachedQuantile is not on this list, so it won't be dumped to levelDB in the background thread. Right now my strategy is to write CachedQuantile to inMemoryStore and levelDB simultaneously during the transition process (background thread is alive), this strategy seems ok in my manual test. CachedQuantile is quite special, it will only be added to kvstore after rebuildAppStore() finishes. that means if we don't handle it specially it will be added to inMemoryStore when the background thread is dumping data(including CachedQuantile objects) to leveldb and hence cause conflicts. ########## File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala ########## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.IOException +import java.util.Collection +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.util.kvstore._ + +/** + * A implementation of KVStore that accelerates event logs loading. + * + * When rebuilding the application state from event logs, HybridStore will + * write data to InMemoryStore at first and use a background thread to dump + * data to LevelDB once the writing to InMemoryStore is completed. + */ + +private[history] class HybridStore extends KVStore { + + private val inMemoryStore = new InMemoryStore() + + private var levelDB: LevelDB = null + + // Flag to indicate whether we should use inMemoryStore or levelDB + private[history] val shouldUseInMemoryStore = new AtomicBoolean(true) + + // A background thread that dumps data from inMemoryStore to levelDB + private var backgroundThread: Thread = null + + // Objects of these classes will be dumped to levelDB in the background thread + private var klassList: Seq[Class[_]] = List( Review comment: ok, I will use a reject-list and add CachedQuantile to reject-list. And I think when using reject-list, we need a HashSet to record the classes that has been added to kvstore. yeah, I agree it's optional to retain these CachedQuantile objects. ########## File path: core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.mutable.HashMap + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.History._ +import org.apache.spark.util.Utils + +/** + * A class used to keep track of in-memory store usage by the SHS. + */ +private class HistoryServerMemoryManager( + conf: SparkConf) extends Logging { + + private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE) + private val currentUsage = new AtomicLong(0L) + private val active = new HashMap[(String, Option[String]), Long]() + + def initialize(): Unit = { + logInfo("Initialized memory manager: " + + s"current usage = ${Utils.bytesToString(currentUsage.get())}, " + + s"max usage = ${Utils.bytesToString(maxUsage)}") + } + + def lease( + appId: String, + attemptId: Option[String], + eventLogSize: Long, + isCompressed: Boolean): Unit = { + val memoryUsage = approximateMemoryUsage(eventLogSize, isCompressed) + if (memoryUsage + currentUsage.get > maxUsage) { + throw new RuntimeException("Not enough memory to create hybrid store " + + s"for app $appId / $attemptId.") + } + active.synchronized { + active(appId -> attemptId) = memoryUsage + } + currentUsage.addAndGet(memoryUsage) + logInfo(s"Leasing ${Utils.bytesToString(memoryUsage)} memory usage for " + + s"app $appId / $attemptId") + } + + def release(appId: String, attemptId: Option[String]): Unit = { + val memoryUsage = active.synchronized { active.remove(appId -> attemptId) } + + memoryUsage match { + case Some(m) => + currentUsage.addAndGet(-m) + logInfo(s"Released ${Utils.bytesToString(m)} memory usage for " + + s"app $appId / $attemptId") Review comment: fixed. ########## File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala ########## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.IOException +import java.util.Collection +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.util.kvstore._ + +/** + * A implementation of KVStore that accelerates event logs loading. + * + * When rebuilding the application state from event logs, HybridStore will + * write data to InMemoryStore at first and use a background thread to dump + * data to LevelDB once the writing to InMemoryStore is completed. + */ + +private[history] class HybridStore extends KVStore { + + private val inMemoryStore = new InMemoryStore() + + private var levelDB: LevelDB = null + + // Flag to indicate whether we should use inMemoryStore or levelDB + private[history] val shouldUseInMemoryStore = new AtomicBoolean(true) + + // A background thread that dumps data from inMemoryStore to levelDB + private var backgroundThread: Thread = null + + // Objects of these classes will be dumped to levelDB in the background thread + private var klassList: Seq[Class[_]] = List( Review comment: If we don't write CachedQuantile to levelDB during the transition process, we might be able to get rid of reject list or allow list, and hence no need to handle some classes specially. I will test that idea and push the code changes later. ########## File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala ########## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.IOException +import java.util.Collection +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.util.kvstore._ + +/** + * A implementation of KVStore that accelerates event logs loading. + * + * When rebuilding the application state from event logs, HybridStore will + * write data to InMemoryStore at first and use a background thread to dump + * data to LevelDB once the writing to InMemoryStore is completed. + */ + +private[history] class HybridStore extends KVStore { + + private val inMemoryStore = new InMemoryStore() + + private var levelDB: LevelDB = null + + // Flag to indicate whether we should use inMemoryStore or levelDB + private[history] val shouldUseInMemoryStore = new AtomicBoolean(true) + + // A background thread that dumps data from inMemoryStore to levelDB + private var backgroundThread: Thread = null + + // Objects of these classes will be dumped to levelDB in the background thread + private var klassList: Seq[Class[_]] = List( Review comment: agree ########## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.kvstore; + +import org.apache.spark.annotation.Private; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Collection; + +/** + * Implementation of KVStore that writes data to InMemoryStore at first and uses + * a background thread to dump data to LevelDB once the writing to InMemoryStore + * is completed. + */ +@Private +public class HybridStore implements KVStore { + + private InMemoryStore inMemoryStore = new InMemoryStore(); + private LevelDB levelDB = null; + + // Flag to indicate if we should use inMemoryStore Or levelDB. + private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true); + + // A background thread that dumps data in inMemoryStore to levelDB + private Thread backgroundThread = null; + + // A hash map that stores all class types (except CachedQuantile) that had been writen + // to inMemoryStore. + private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>(); + + // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need + // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile + // objects when the underlying store is inMemoryStore, and dump these objects to levelDB + // before the switch completes. + private Class<?> cachedQuantileKlass = null; + private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>(); + + + @Override + public <T> T getMetadata(Class<T> klass) throws Exception { + KVStore store = getStore(); + T metaData = store.getMetadata(klass); + return metaData; + } + + @Override + public void setMetadata(Object value) throws Exception { + KVStore store = getStore(); + store.setMetadata(value); + } + + @Override + public <T> T read(Class<T> klass, Object naturalKey) throws Exception { + KVStore store = getStore(); + T value = store.read(klass, naturalKey); + return value; + } + + @Override + public void write(Object value) throws Exception { + Class<?> klass = value.getClass(); + + if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) { + throw new RuntimeException("write() for objects other than CachedQuantile " + + "shouldn't be called after the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); + store.write(value); + + if (store instanceof InMemoryStore) { + if (klass.equals(cachedQuantileKlass)) { + cachedQuantileQueue.add(value); + } else { + klassMap.putIfAbsent(klass, true); + } + } + } + + @Override + public void delete(Class<?> type, Object naturalKey) throws Exception { + if (backgroundThread != null) { + throw new RuntimeException("delete() shouldn't be called after " + + "the hybrid store begins switching to levelDB"); + } + + KVStore store = getStore(); + store.delete(type, naturalKey); + } + + @Override + public <T> KVStoreView<T> view(Class<T> type) throws Exception { + KVStore store = getStore(); + KVStoreView<T> view = store.view(type); + return view; + } + + @Override + public long count(Class<?> type) throws Exception { + KVStore store = getStore(); + long count = store.count(type); + return count; + } + + @Override + public long count(Class<?> type, String index, Object indexedValue) throws Exception { + KVStore store = getStore(); Review comment: addressed ########## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ########## @@ -1167,6 +1172,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // At this point the disk data either does not exist or was deleted because it failed to // load, so the event log needs to be replayed. + // If hybrid store is enabled, try it first. Review comment: retrial logic is added. ########## File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala ########## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.IOException +import java.util.Collection +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.util.kvstore._ + +/** + * A implementation of KVStore that accelerates event logs loading. + * + * When rebuilding the application state from event logs, HybridStore will + * write data to InMemoryStore at first and use a background thread to dump + * data to LevelDB once the writing to InMemoryStore is completed. + */ + +private[history] class HybridStore extends KVStore { + + private val inMemoryStore = new InMemoryStore() + + private var levelDB: LevelDB = null + + // Flag to indicate whether we should use inMemoryStore or levelDB + private[history] val shouldUseInMemoryStore = new AtomicBoolean(true) + + // A background thread that dumps data from inMemoryStore to levelDB + private var backgroundThread: Thread = null + + // Objects of these classes will be dumped to levelDB in the background thread + private var klassList: Seq[Class[_]] = List( Review comment: Explicit logic for handling particular classes are removed. ########## File path: core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.IOException +import java.util.Collection +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean + +import scala.collection.JavaConverters._ + +import org.apache.spark.util.kvstore._ + +/** + * A implementation of KVStore that accelerates event logs loading. + * + * When rebuilding the application state from event logs, HybridStore will + * write data to InMemoryStore at first and use a background thread to dump + * data to LevelDB once the writing to InMemoryStore is completed. + */ + +private[history] class HybridStore extends KVStore { + + private val inMemoryStore = new InMemoryStore() + + private var levelDB: LevelDB = null + + // Flag to indicate whether we should use inMemoryStore or levelDB + private[history] val shouldUseInMemoryStore = new AtomicBoolean(true) + + // A background thread that dumps data from inMemoryStore to levelDB + private var backgroundThread: Thread = null + + // A hash map that stores all classes that had been writen to inMemoryStore + private val klassMap = new ConcurrentHashMap[Class[_], Boolean] + + override def getMetadata[T](klass: Class[T]): T = { + getStore().getMetadata(klass) + } + + override def setMetadata(value: Object): Unit = { + getStore().setMetadata(value) + } + + override def read[T](klass: Class[T], naturalKey: Object): T = { + getStore().read(klass, naturalKey) + } + + override def write(value: Object): Unit = { + val store = getStore() + store.write(value) Review comment: we can update it to getStore().write(value) ########## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ########## @@ -1197,6 +1213,78 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) KVUtils.open(newStorePath, metadata) } + private def createHybridStore( + dm: HistoryServerDiskManager, + appId: String, + attempt: AttemptInfoWrapper, + metadata: AppStatusStoreMetadata): KVStore = { + + var retried = false + var hybridStore: HybridStore = null + while (hybridStore == null) { + val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), + attempt.lastIndex) + val isCompressed = reader.compressionCodec.isDefined + + // Throws an exception if the memory space is not enough + memoryManager.lease(appId, attempt.info.attemptId, reader.totalSize, isCompressed) + + logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") + val lease = dm.lease(reader.totalSize, isCompressed) + val isLeaseRolledBack = new java.util.concurrent.atomic.AtomicBoolean(false) + var store: HybridStore = null + try { + store = new HybridStore() + val levelDB = KVUtils.open(lease.tmpPath, metadata) + store.setLevelDB(levelDB) + rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime()) + + // Start the background thread to dump data to levelDB when writing to + // InMemoryStore is completed. + store.switchToLevelDB(new HybridStore.SwitchToLevelDBListener { + override def onSwitchToLevelDBSuccess: Unit = { + levelDB.close() + val newStorePath = lease.commit(appId, attempt.info.attemptId) + store.setLevelDB(KVUtils.open(newStorePath, metadata)) + memoryManager.release(appId, attempt.info.attemptId) + logInfo(s"Completely switched to LevelDB for app $appId / ${attempt.info.attemptId}.") + } + + override def onSwitchToLevelDBFail(e: Exception): Unit = { + levelDB.close() + if (!isLeaseRolledBack.getAndSet(true)) { + lease.rollback() + } Review comment: I feel like this atomicBoolean variable is somewhat unnecessary. If the background thread can successfully start, the createHybridStore() will finish and return, hence no exceptions will be caught in the foreground thread. If exceptions occurred before backgroundThread.start(), the background thread won't be started and hence the lease can only be rollbacked on the foreground thread. What do you think? @HyukjinKwon @redsanket ########## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ########## @@ -1197,6 +1213,78 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) KVUtils.open(newStorePath, metadata) } + private def createHybridStore( + dm: HistoryServerDiskManager, + appId: String, + attempt: AttemptInfoWrapper, + metadata: AppStatusStoreMetadata): KVStore = { + + var retried = false + var hybridStore: HybridStore = null + while (hybridStore == null) { + val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), + attempt.lastIndex) + val isCompressed = reader.compressionCodec.isDefined + + // Throws an exception if the memory space is not enough + memoryManager.lease(appId, attempt.info.attemptId, reader.totalSize, isCompressed) + + logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") + val lease = dm.lease(reader.totalSize, isCompressed) + val isLeaseRolledBack = new java.util.concurrent.atomic.AtomicBoolean(false) + var store: HybridStore = null + try { + store = new HybridStore() + val levelDB = KVUtils.open(lease.tmpPath, metadata) + store.setLevelDB(levelDB) + rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime()) + + // Start the background thread to dump data to levelDB when writing to + // InMemoryStore is completed. + store.switchToLevelDB(new HybridStore.SwitchToLevelDBListener { + override def onSwitchToLevelDBSuccess: Unit = { + levelDB.close() + val newStorePath = lease.commit(appId, attempt.info.attemptId) + store.setLevelDB(KVUtils.open(newStorePath, metadata)) + memoryManager.release(appId, attempt.info.attemptId) + logInfo(s"Completely switched to LevelDB for app $appId / ${attempt.info.attemptId}.") + } + + override def onSwitchToLevelDBFail(e: Exception): Unit = { + levelDB.close() + if (!isLeaseRolledBack.getAndSet(true)) { + lease.rollback() + } Review comment: I feel like this atomicBoolean variable is somewhat unnecessary. If the background thread can successfully start, the createHybridStore() will finish and return, hence no exceptions will be caught in the foreground thread. If exceptions occurred before backgroundThread.start(), the background thread won't be started and hence the lease can only be rollbacked on the foreground thread. What do you think? @HeartSaVioR @redsanket ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org