keith-turner commented on code in PR #2569: URL: https://github.com/apache/accumulo/pull/2569#discussion_r846560813
########## server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java: ########## @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.store.impl; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiFunction; + +import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.metrics.MetricsUtil; +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.codec.VersionedPropCodec; +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropCache; +import org.apache.accumulo.server.conf.store.PropCacheKey; +import org.apache.accumulo.server.conf.store.PropChangeListener; +import org.apache.accumulo.server.conf.store.PropStore; +import org.apache.accumulo.server.conf.store.PropStoreException; +import org.apache.accumulo.server.conf.util.ConfigTransformer; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.annotations.VisibleForTesting; + +public class ZooPropStore implements PropStore, PropChangeListener { + + private final static Logger log = LoggerFactory.getLogger(ZooPropStore.class); + private final static VersionedPropCodec codec = VersionedPropCodec.getDefault(); + + private final ZooReaderWriter zrw; + private final PropStoreWatcher propStoreWatcher; + private final PropCache cache; + private final PropStoreMetrics cacheMetrics = new PropStoreMetrics(); + private final ReadyMonitor zkReadyMon; + + /** + * Create instance using ZooPropStore.Builder + * + * @param instanceId + * the instance id + * @param zrw + * a wrapper set of utilities for accessing ZooKeeper. + * @param readyMonitor + * coordination utility for ZooKeeper connection status. + * @param propStoreWatcher + * an extended ZooKeeper watcher + * @param ticker + * a synthetic clock used for testing. + */ + private ZooPropStore(final InstanceId instanceId, final ZooReaderWriter zrw, + final ReadyMonitor readyMonitor, final PropStoreWatcher propStoreWatcher, + final Ticker ticker) { + + this.zrw = zrw; + this.zkReadyMon = readyMonitor; + this.propStoreWatcher = propStoreWatcher; + + MetricsUtil.initializeProducers(cacheMetrics); + + ZooPropLoader propLoader = new ZooPropLoader(zrw, codec, propStoreWatcher, cacheMetrics); + + if (ticker == null) { + cache = new PropCacheCaffeineImpl.Builder(propLoader, cacheMetrics).build(); + } else { + cache = + new PropCacheCaffeineImpl.Builder(propLoader, cacheMetrics).withTicker(ticker).build(); + } + + try { + var path = ZooUtil.getRoot(instanceId); + if (zrw.exists(path, propStoreWatcher)) { + log.debug("Have a ZooKeeper connection and found instance node: {}", instanceId); + zkReadyMon.setReady(); + } else { + throw new IllegalStateException("Instance may not have been initialized, root node: " + path + + " does not exist in ZooKeeper"); + } + } catch (InterruptedException | KeeperException ex) { + throw new IllegalStateException("Failed to read root node " + instanceId + " from ZooKeeper", + ex); + } + } + + public static PropStore initialize(final InstanceId instanceId, final ZooReaderWriter zrw) { + return new ZooPropStore.Builder(instanceId, zrw, zrw.getSessionTimeout()).build(); + } + + /** + * Create the system configuration node and initialize with empty props - used when creating a new + * Accumulo instance. + * <p> + * Needs to be called early in the Accumulo ZooKeeper initialization sequence so that correct + * watchers can be created for the instance when the PropStore is instantiated. + * + * @param instanceId + * the instance uuid. + * @param zrw + * a ZooReaderWriter + */ + public static void instancePathInit(final InstanceId instanceId, final ZooReaderWriter zrw) + throws InterruptedException, KeeperException { + var sysPropPath = PropCacheKey.forSystem(instanceId).getPath(); + VersionedProperties vProps = new VersionedProperties(); + try { + var created = + zrw.putPersistentData(sysPropPath, codec.toBytes(vProps), ZooUtil.NodeExistsPolicy.FAIL); + if (!created) { + throw new IllegalStateException( + "Failed to create default system props during initialization at: {}" + sysPropPath); + } + } catch (IOException ex) { + throw new IllegalStateException( + "Failed to create default system props during initialization at: {}" + sysPropPath, ex); + } + } + + @Override + public boolean exists(final PropCacheKey propCacheKey) throws PropStoreException { + try { + if (zrw.exists(propCacheKey.getPath())) { + return true; + } + + } catch (KeeperException ex) { + // ignore Keeper exception on check. + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new PropStoreException("Interrupted testing if node exists", ex); + } + return false; + } + + /** + * Create initial system props for the instance. If the node already exists, no action is + * performed. + * + * @param context + * the server context. + * @param initProps + * map of k, v pairs of initial properties. + */ + public synchronized static void initSysProps(final ServerContext context, + final Map<String,String> initProps) { + PropCacheKey sysPropKey = PropCacheKey.forSystem(context.getInstanceID()); + createInitialProps(context, sysPropKey, initProps); + } + + /** + * Create initial properties if they do not exist. If the node exists, initialization will be + * skipped. + * + * @param context + * the system context + * @param propCacheKey + * a prop id + * @param props + * initial properties + */ + public static void createInitialProps(final ServerContext context, + final PropCacheKey propCacheKey, Map<String,String> props) { + + try { + ZooReaderWriter zrw = context.getZooReaderWriter(); + if (zrw.exists(propCacheKey.getPath())) { + return; + } + VersionedProperties vProps = new VersionedProperties(props); + zrw.putPersistentData(propCacheKey.getPath(), codec.toBytes(vProps), + ZooUtil.NodeExistsPolicy.FAIL); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new PropStoreException("Interrupted creating node " + propCacheKey, ex); + } catch (Exception ex) { + throw new PropStoreException("Failed to create node " + propCacheKey, ex); + } + } + + public PropStoreMetrics getMetrics() { + return cacheMetrics; + } + + @Override + public void create(PropCacheKey propCacheKey, Map<String,String> props) { + + try { + VersionedProperties vProps = new VersionedProperties(props); + String path = propCacheKey.getPath(); + zrw.putPersistentData(path, codec.toBytes(vProps), ZooUtil.NodeExistsPolicy.FAIL); + } catch (IOException | KeeperException | InterruptedException ex) { + throw new PropStoreException("Failed to serialize properties for " + propCacheKey, ex); + } + } + + /** + * get or create properties from the store. If the property node does not exist in ZooKeeper, + * legacy properties exist, they will be converted to the new storage form and naming convention. + * The legacy properties are deleted once the new node format is written. + * + * @param propCacheKey + * the prop cache key + * @return The versioned properties or null if the properties do not exist for the id. + * @throws PropStoreException + * if the updates fails because of an underlying store exception + */ + @Override + public @NonNull VersionedProperties get(final PropCacheKey propCacheKey) + throws PropStoreException { + checkZkConnection(); // if ZK not connected, block, do not just return a cached value. Review Comment: What scenario does this protect against? Immediately after this method returns before it executes any subsequent statements, ZK could become disconnected. ########## server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java: ########## @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.store.impl; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiFunction; + +import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.metrics.MetricsUtil; +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.codec.VersionedPropCodec; +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropCache; +import org.apache.accumulo.server.conf.store.PropCacheKey; +import org.apache.accumulo.server.conf.store.PropChangeListener; +import org.apache.accumulo.server.conf.store.PropStore; +import org.apache.accumulo.server.conf.store.PropStoreException; +import org.apache.accumulo.server.conf.util.ConfigTransformer; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.annotations.VisibleForTesting; + +public class ZooPropStore implements PropStore, PropChangeListener { + + private final static Logger log = LoggerFactory.getLogger(ZooPropStore.class); + private final static VersionedPropCodec codec = VersionedPropCodec.getDefault(); + + private final ZooReaderWriter zrw; + private final PropStoreWatcher propStoreWatcher; + private final PropCache cache; + private final PropStoreMetrics cacheMetrics = new PropStoreMetrics(); + private final ReadyMonitor zkReadyMon; + + /** + * Create instance using ZooPropStore.Builder + * + * @param instanceId + * the instance id + * @param zrw + * a wrapper set of utilities for accessing ZooKeeper. + * @param readyMonitor + * coordination utility for ZooKeeper connection status. + * @param propStoreWatcher + * an extended ZooKeeper watcher + * @param ticker + * a synthetic clock used for testing. + */ + private ZooPropStore(final InstanceId instanceId, final ZooReaderWriter zrw, + final ReadyMonitor readyMonitor, final PropStoreWatcher propStoreWatcher, + final Ticker ticker) { + + this.zrw = zrw; + this.zkReadyMon = readyMonitor; + this.propStoreWatcher = propStoreWatcher; + + MetricsUtil.initializeProducers(cacheMetrics); + + ZooPropLoader propLoader = new ZooPropLoader(zrw, codec, propStoreWatcher, cacheMetrics); + + if (ticker == null) { + cache = new PropCacheCaffeineImpl.Builder(propLoader, cacheMetrics).build(); + } else { + cache = + new PropCacheCaffeineImpl.Builder(propLoader, cacheMetrics).withTicker(ticker).build(); + } + + try { + var path = ZooUtil.getRoot(instanceId); + if (zrw.exists(path, propStoreWatcher)) { + log.debug("Have a ZooKeeper connection and found instance node: {}", instanceId); + zkReadyMon.setReady(); + } else { + throw new IllegalStateException("Instance may not have been initialized, root node: " + path + + " does not exist in ZooKeeper"); + } + } catch (InterruptedException | KeeperException ex) { + throw new IllegalStateException("Failed to read root node " + instanceId + " from ZooKeeper", + ex); + } + } + + public static PropStore initialize(final InstanceId instanceId, final ZooReaderWriter zrw) { + return new ZooPropStore.Builder(instanceId, zrw, zrw.getSessionTimeout()).build(); + } + + /** + * Create the system configuration node and initialize with empty props - used when creating a new + * Accumulo instance. + * <p> + * Needs to be called early in the Accumulo ZooKeeper initialization sequence so that correct + * watchers can be created for the instance when the PropStore is instantiated. + * + * @param instanceId + * the instance uuid. + * @param zrw + * a ZooReaderWriter + */ + public static void instancePathInit(final InstanceId instanceId, final ZooReaderWriter zrw) + throws InterruptedException, KeeperException { + var sysPropPath = PropCacheKey.forSystem(instanceId).getPath(); + VersionedProperties vProps = new VersionedProperties(); + try { + var created = + zrw.putPersistentData(sysPropPath, codec.toBytes(vProps), ZooUtil.NodeExistsPolicy.FAIL); + if (!created) { + throw new IllegalStateException( + "Failed to create default system props during initialization at: {}" + sysPropPath); + } + } catch (IOException ex) { + throw new IllegalStateException( + "Failed to create default system props during initialization at: {}" + sysPropPath, ex); + } + } + + @Override + public boolean exists(final PropCacheKey propCacheKey) throws PropStoreException { + try { + if (zrw.exists(propCacheKey.getPath())) { + return true; + } + + } catch (KeeperException ex) { + // ignore Keeper exception on check. + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new PropStoreException("Interrupted testing if node exists", ex); + } + return false; + } + + /** + * Create initial system props for the instance. If the node already exists, no action is + * performed. + * + * @param context + * the server context. + * @param initProps + * map of k, v pairs of initial properties. + */ + public synchronized static void initSysProps(final ServerContext context, + final Map<String,String> initProps) { + PropCacheKey sysPropKey = PropCacheKey.forSystem(context.getInstanceID()); + createInitialProps(context, sysPropKey, initProps); + } + + /** + * Create initial properties if they do not exist. If the node exists, initialization will be + * skipped. + * + * @param context + * the system context + * @param propCacheKey + * a prop id + * @param props + * initial properties + */ + public static void createInitialProps(final ServerContext context, + final PropCacheKey propCacheKey, Map<String,String> props) { + + try { + ZooReaderWriter zrw = context.getZooReaderWriter(); + if (zrw.exists(propCacheKey.getPath())) { + return; + } + VersionedProperties vProps = new VersionedProperties(props); + zrw.putPersistentData(propCacheKey.getPath(), codec.toBytes(vProps), + ZooUtil.NodeExistsPolicy.FAIL); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new PropStoreException("Interrupted creating node " + propCacheKey, ex); + } catch (Exception ex) { + throw new PropStoreException("Failed to create node " + propCacheKey, ex); + } + } + + public PropStoreMetrics getMetrics() { + return cacheMetrics; + } + + @Override + public void create(PropCacheKey propCacheKey, Map<String,String> props) { + + try { + VersionedProperties vProps = new VersionedProperties(props); + String path = propCacheKey.getPath(); + zrw.putPersistentData(path, codec.toBytes(vProps), ZooUtil.NodeExistsPolicy.FAIL); + } catch (IOException | KeeperException | InterruptedException ex) { + throw new PropStoreException("Failed to serialize properties for " + propCacheKey, ex); + } + } + + /** + * get or create properties from the store. If the property node does not exist in ZooKeeper, + * legacy properties exist, they will be converted to the new storage form and naming convention. + * The legacy properties are deleted once the new node format is written. + * + * @param propCacheKey + * the prop cache key + * @return The versioned properties or null if the properties do not exist for the id. + * @throws PropStoreException + * if the updates fails because of an underlying store exception + */ + @Override + public @NonNull VersionedProperties get(final PropCacheKey propCacheKey) + throws PropStoreException { + checkZkConnection(); // if ZK not connected, block, do not just return a cached value. + propStoreWatcher.registerListener(propCacheKey, this); + + var props = cache.get(propCacheKey); + if (props != null) { + return props; + } + + return new ConfigTransformer(zrw, codec, propStoreWatcher).transform(propCacheKey); Review Comment: Looking at the impl for ConfigTransformer it puts an ehpemeral node in ZK each time its called. This feels expensive for what seems like it should be a read operation. Seems like its doing this as part of an upgrade, could this not be done in the Managers upgrade proces as a one time deal instead of every time this is read? ########## server/base/src/main/java/org/apache/accumulo/server/conf/ZooBasedConfiguration.java: ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.store.PropCacheId; +import org.apache.accumulo.server.conf.store.PropChangeListener; +import org.apache.accumulo.server.conf.store.PropStore; +import org.apache.accumulo.server.conf.store.PropStoreException; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; + +/** + * Instances maintain a local cache of the AccumuloConfiguration hierarchy that will be consistent + * with stored properties. + * <p> + * When calling getProperties - the local copy will be updated if ZooKeeper changes have been + * received. + * <p> + * The getUpdateCount() provides an optimization for clients - the count can be used to detect + * changes without reading the properties. When the update count changes, the next getProperties + * call will update the local copy and the change count. + */ +public class ZooBasedConfiguration extends AccumuloConfiguration implements PropChangeListener { + + protected final Logger log; + private final AccumuloConfiguration parent; + private final PropCacheId propCacheId; + private final PropStore propStore; + + private final AtomicReference<PropSnapshot> snapshotRef = new AtomicReference<>(null); + + public ZooBasedConfiguration(Logger log, ServerContext context, PropCacheId propCacheId, + AccumuloConfiguration parent) { + this.log = requireNonNull(log, "a Logger must be supplied"); + requireNonNull(context, "the context cannot be null"); + this.propCacheId = requireNonNull(propCacheId, "a PropCacheId must be supplied"); + this.parent = requireNonNull(parent, "An AccumuloConfiguration parent must be supplied"); + + this.propStore = + requireNonNull(context.getPropStore(), "The PropStore must be supplied and exist"); + + propStore.registerAsListener(propCacheId, this); + + snapshotRef.set(updateSnapshot()); + + } + + public long getDataVersion() { + var snapshot = snapshotRef.get(); + if (snapshot == null) { + return updateSnapshot().getDataVersion(); + } + return snapshot.getDataVersion(); + } + + /** + * The update count is the sum of the change count of this configuration and the change counts of + * the parents. The count is used to detect if any changes occurred in the configuration hierarchy + * and if the configuration needs to be recalculated to maintain consistency with values in the + * backend store. + * <p> + * The count is required to be an increasing value. + */ + @Override + public long getUpdateCount() { + long count = 0; + long dataVersion = 0; + for (AccumuloConfiguration p = this; p != null; p = p.getParent()) { + if (p instanceof ZooBasedConfiguration) { + dataVersion = ((ZooBasedConfiguration) p).getDataVersion(); + } else { + dataVersion = p.getUpdateCount(); + } + count += dataVersion; + } + + log.trace("update count result for: {} - data version: {} update: {}", propCacheId, dataVersion, + count); + return count; + } + + @Override + public AccumuloConfiguration getParent() { + return parent; + } + + public PropCacheId getCacheId() { + return propCacheId; + } + + @Override + public @Nullable String get(final Property property) { + Map<String,String> props = getSnapshot(); + String value = props.get(property.getKey()); + if (value != null) { + return value; + } + AccumuloConfiguration parent = getParent(); + if (parent != null) { + return parent.get(property); + } + return null; + } + + @Override + public void getProperties(final Map<String,String> props, final Predicate<String> filter) { + + parent.getProperties(props, filter); + + Map<String,String> theseProps = getSnapshot(); + + log.trace("getProperties() for: {} filter: {}, have: {}, passed: {}", getCacheId(), filter, + theseProps, props); + + for (Map.Entry<String,String> p : theseProps.entrySet()) { + if (filter.test(p.getKey()) && p.getValue() != null) { + log.trace("passed filter - add to map: {} = {}", p.getKey(), p.getValue()); + props.put(p.getKey(), p.getValue()); + } + } + } + + @Override + public boolean isPropertySet(final Property property) { + + Map<String,String> theseProps = getSnapshot(); + + if (theseProps.get(property.getKey()) != null) { + return true; + } + + return getParent().isPropertySet(property); + + } + + public Map<String,String> getSnapshot() { + if (snapshotRef.get() == null) { + return updateSnapshot().getProps(); + } + return snapshotRef.get().getProps(); + } + + @Override + public void invalidateCache() { + snapshotRef.set(null); + } + + private final Lock updateLock = new ReentrantLock(); + + private @NonNull PropSnapshot updateSnapshot() throws PropStoreException { + + PropSnapshot localSnapshot = snapshotRef.get(); Review Comment: If the complexity of this two level scheme is there to deal with deleted tables another possible way to do deal with that is periodic reconciliation. Some things in the Accumulo do this they just periodically run a task that looks for deleted things to clean out. So could get the set of all tables id in ZK and then conceptually do something like `tableConfigCache.keySet().retainAll(tablesIdsFromZK)` every few hours AND maybe have a simpler once level cache. However I am still looking at this, so not completely sure if my suggestion is any good. > This can be a moderate memory leak if you routinely create many tables. To deal with this, the caffeine cache expires, and (hopefully) these configuration objects in the config hierarchy can be garbage collected (in the case of table/namespace config objects). In the ServerConfigurationFactory class there is a static map of tableConfigs that may never get cleaned up in anyway. So not sure if these could ever be GCed once created. ########## server/base/src/main/java/org/apache/accumulo/server/conf/util/ConfigTransformer.java: ########## @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.util; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.accumulo.core.conf.DeprecatedPropertyUtil; +import org.apache.accumulo.core.util.DurationFormat; +import org.apache.accumulo.fate.util.Retry; +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil; +import org.apache.accumulo.server.conf.codec.VersionedPropCodec; +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropCacheKey; +import org.apache.accumulo.server.conf.store.PropStoreException; +import org.apache.accumulo.server.conf.store.impl.PropStoreWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Read legacy properties (pre 2.1) from ZooKeeper and transform them into the single node format. + * The encoded properties are stored in ZooKeeper and then the legacy property ZooKeeper nodes are + * deleted. + */ +public class ConfigTransformer { + + private static final Logger log = LoggerFactory.getLogger(ConfigTransformer.class); + + private final ZooReaderWriter zrw; + private final VersionedPropCodec codec; + private final PropStoreWatcher propStoreWatcher; + private final Retry retry; + + /** + * Instantiate a transformer instance. + * + * @param zrw + * a ZooReaderWriter + * @param codec + * the codec used to encode to the single-node format. + * @param propStoreWatcher + * the watcher registered to receive future notifications of changes to the encoded + * property node. + */ + public ConfigTransformer(final ZooReaderWriter zrw, VersionedPropCodec codec, + final PropStoreWatcher propStoreWatcher) { + this.zrw = zrw; + this.codec = codec; + this.propStoreWatcher = propStoreWatcher; + + // default - allow for a conservative max delay of about a minute + retry = + Retry.builder().maxRetries(15).retryAfter(250, MILLISECONDS).incrementBy(500, MILLISECONDS) + .maxWait(5, SECONDS).backOffFactor(1.75).logInterval(3, MINUTES).createRetry(); + + } + + public ConfigTransformer(final ZooReaderWriter zrw, VersionedPropCodec codec, + final PropStoreWatcher propStoreWatcher, final Retry retry) { + this.zrw = zrw; + this.codec = codec; + this.propStoreWatcher = propStoreWatcher; + this.retry = retry; + } + + /** + * Transform the properties for the provided prop cache key. + * + * @return the encoded properties. + */ + public VersionedProperties transform(final PropCacheKey propCacheKey) { + TransformLock lock = TransformLock.createLock(propCacheKey, zrw); + return transform(propCacheKey, lock); + } + + // Allow external (mocked) TransformLock to be used + @VisibleForTesting + VersionedProperties transform(final PropCacheKey propCacheKey, final TransformLock lock) { + + VersionedProperties results; + Instant start = Instant.now(); + try { + while (!lock.isLocked()) { + try { + retry.useRetry(); + retry.waitForNextAttempt(); + // look and return node if created. + if (zrw.exists(propCacheKey.getPath())) { + Stat stat = new Stat(); + byte[] bytes = zrw.getData(propCacheKey.getPath(), propStoreWatcher, stat); + return codec.fromBytes(stat.getVersion(), bytes); + } + // still does not exist - try again. + lock.lock(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new PropStoreException("Failed to get transform lock for " + propCacheKey, ex); + } catch (IllegalStateException ex) { + throw new PropStoreException("Failed to get transform lock for " + propCacheKey, ex); + } + } + + Set<LegacyPropNode> upgradeNodes = readLegacyProps(propCacheKey); + + upgradeNodes = convertDeprecatedProps(propCacheKey, upgradeNodes); + + results = writeConverted(propCacheKey, upgradeNodes); + + if (results == null) { + throw new PropStoreException("Could not create properties for " + propCacheKey, null); + } + + // validate lock still valid before deletion. + if (!lock.validateLock()) { Review Comment: What situation does this defend against? The lock could be "valid" but become "not valid" after this call and before the call to `deleteLegacyProps` ########## server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java: ########## @@ -84,96 +73,54 @@ public TableConfiguration(ServerContext context, TableId tableId, NamespaceConfi newDeriver(conf -> createCompactionDispatcher(conf, context, tableId)); } - void setZooCacheFactory(ZooCacheFactory zcf) { - this.zcf = zcf; - } - - private ZooCache getZooCache() { - synchronized (propCaches) { - PropCacheKey key = new PropCacheKey(context.getInstanceID(), tableId.canonical()); - ZooCache propCache = propCaches.get(key); - if (propCache == null) { - propCache = zcf.getZooCache(context.getZooKeepers(), context.getZooKeepersSessionTimeOut()); - propCaches.put(key, propCache); - } - return propCache; - } - } - - private ZooCachePropertyAccessor getPropCacheAccessor() { - // updateAndGet below always calls compare and set, so avoid if not null - ZooCachePropertyAccessor zcpa = propCacheAccessor.get(); - if (zcpa != null) { - return zcpa; + @Override + public boolean isPropertySet(Property prop) { + if (_isPropertySet(prop)) { + return true; } - return propCacheAccessor - .updateAndGet(pca -> pca == null ? new ZooCachePropertyAccessor(getZooCache()) : pca); - } - - private String getPath() { - return context.getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_CONF; + return getParent().isPropertySet(prop); } - @Override - public boolean isPropertySet(Property prop, boolean cacheAndWatch) { - if (!cacheAndWatch) { - throw new UnsupportedOperationException( - "Table configuration only supports checking if a property is set in cache."); - } - - if (getPropCacheAccessor().isPropertySet(prop, getPath())) { - return true; + private boolean _isPropertySet(Property property) { + Map<String,String> propMap = getSnapshot(); + if (propMap == null) { Review Comment: What are the scenarios where the snapshot is null? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
