dlmarion commented on code in PR #2665:
URL: https://github.com/apache/accumulo/pull/2665#discussion_r939081834


##########
server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java:
##########
@@ -0,0 +1,987 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
+import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
+import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.TColumn;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import 
org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
+import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.spi.scan.ScanServerSelector;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
+import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ServiceLock;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.AbstractServer;
+import org.apache.accumulo.server.GarbageCollectionLogger;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
+import org.apache.accumulo.server.security.SecurityUtil;
+import 
org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.tserver.session.MultiScanSession;
+import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.ScanSession.TabletResolver;
+import org.apache.accumulo.tserver.session.Session;
+import org.apache.accumulo.tserver.session.SessionManager;
+import org.apache.accumulo.tserver.session.SingleScanSession;
+import org.apache.accumulo.tserver.tablet.SnapshotTablet;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.zookeeper.KeeperException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+public class ScanServer extends AbstractServer
+    implements TabletScanClientService.Iface, TabletHostingServer {
+
+  public static class ScanServerOpts extends ServerOpts {
+    @Parameter(required = false, names = {"-g", "--group"},
+        description = "Optional group name that will be made available to the 
ScanServerSelector client plugin.  If not specified will be set to '"
+            + ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME
+            + "'.  Groups support at least two use cases : dedicating 
resources to scans and/or using different hardware for scans.")
+    private String groupName = 
ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
+
+    public String getGroupName() {
+      return groupName;
+    }
+  }
+
+  private static final Logger log = LoggerFactory.getLogger(ScanServer.class);
+
+  private static class TabletMetadataLoader implements 
CacheLoader<KeyExtent,TabletMetadata> {
+
+    private final Ample ample;
+
+    private TabletMetadataLoader(Ample ample) {
+      this.ample = ample;
+    }
+
+    @Override
+    public @Nullable TabletMetadata load(KeyExtent keyExtent) {
+      long t1 = System.currentTimeMillis();
+      var tm = ample.readTablet(keyExtent);
+      long t2 = System.currentTimeMillis();
+      LOG.trace("Read metadata for 1 tablet in {} ms", t2 - t1);
+      return tm;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Map<? extends KeyExtent,? extends TabletMetadata>
+        loadAll(Set<? extends KeyExtent> keys) {
+      long t1 = System.currentTimeMillis();
+      var tms = ample.readTablets().forTablets((Collection<KeyExtent>) 
keys).build().stream()
+          .collect(Collectors.toMap(tm -> tm.getExtent(), tm -> tm));
+      long t2 = System.currentTimeMillis();
+      LOG.trace("Read metadata for {} tablets in {} ms", keys.size(), t2 - t1);
+      return tms;
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
+
+  protected ThriftScanClientHandler delegate;
+  private UUID serverLockUUID;
+  private final TabletMetadataLoader tabletMetadataLoader;
+  private final LoadingCache<KeyExtent,TabletMetadata> tabletMetadataCache;
+  // tracks file reservations that are in the process of being added or 
removed from the metadata
+  // table
+  private final Set<StoredTabletFile> influxFiles = new HashSet<>();
+  // a read lock that ensures files are not removed from reservedFiles while 
its held
+  private final ReentrantReadWriteLock.ReadLock reservationsReadLock;
+  // a write lock that must be held when mutating influxFiles or when removing 
entries from
+  // reservedFiles
+  private final ReentrantReadWriteLock.WriteLock reservationsWriteLock;
+  // this condition is used to signal changes to influxFiles
+  private final Condition reservationCondition;
+  // the key is the set of files that have reservations in the metadata table, 
the value contains
+  // information about which scans are currently using the file
+  private final Map<StoredTabletFile,ReservedFile> reservedFiles = new 
ConcurrentHashMap<>();
+  private final AtomicLong nextScanReservationId = new AtomicLong();
+
+  private final ServerContext context;
+  private final SessionManager sessionManager;
+  private final TabletServerResourceManager resourceManager;
+  HostAndPort clientAddress;
+  private final GarbageCollectionLogger gcLogger = new 
GarbageCollectionLogger();
+
+  private volatile boolean serverStopRequested = false;
+  private ServiceLock scanServerLock;
+  protected TabletServerScanMetrics scanMetrics;
+
+  private ZooCache managerLockCache;
+
+  private final String groupName;
+
+  public ScanServer(ScanServerOpts opts, String[] args) {
+    super("sserver", opts, args);
+
+    context = super.getContext();
+    log.info("Version " + Constants.VERSION);
+    log.info("Instance " + getContext().getInstanceID());
+    this.sessionManager = new SessionManager(context);
+
+    this.resourceManager = new TabletServerResourceManager(context, this);
+
+    this.managerLockCache = new ZooCache(context.getZooReader(), null);
+
+    var readWriteLock = new ReentrantReadWriteLock();
+    reservationsReadLock = readWriteLock.readLock();
+    reservationsWriteLock = readWriteLock.writeLock();
+    reservationCondition = readWriteLock.writeLock().newCondition();
+
+    // Note: The way to control the number of concurrent scans that a 
ScanServer will
+    // perform is by using Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS or 
the number
+    // of threads in Property.SSERV_SCAN_EXECUTORS_PREFIX.
+
+    long cacheExpiration =
+        
getConfiguration().getTimeInMillis(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION);
+
+    long scanServerReservationExpiration =
+        
getConfiguration().getTimeInMillis(Property.SSERVER_SCAN_REFERENCE_EXPIRATION_TIME);
+
+    tabletMetadataLoader = new TabletMetadataLoader(getContext().getAmple());
+
+    if (cacheExpiration == 0L) {
+      LOG.warn("Tablet metadata caching disabled, may cause excessive scans on 
metadata table.");
+      tabletMetadataCache = null;
+    } else {
+      if (cacheExpiration < 60000) {
+        LOG.warn(
+            "Tablet metadata caching less than one minute, may cause excessive 
scans on metadata table.");
+      }
+      tabletMetadataCache =
+          Caffeine.newBuilder().expireAfterWrite(cacheExpiration, 
TimeUnit.MILLISECONDS)
+              
.scheduler(Scheduler.systemScheduler()).build(tabletMetadataLoader);
+    }
+
+    delegate = newThriftScanClientHandler(new WriteTracker());
+
+    this.groupName = Objects.requireNonNull(opts.getGroupName());
+
+    ThreadPools.watchCriticalScheduledTask(getContext().getScheduledExecutor()
+        .scheduleWithFixedDelay(() -> 
cleanUpReservedFiles(scanServerReservationExpiration),
+            scanServerReservationExpiration, scanServerReservationExpiration,
+            TimeUnit.MILLISECONDS));
+
+  }
+
+  @VisibleForTesting
+  protected ThriftScanClientHandler newThriftScanClientHandler(WriteTracker 
writeTracker) {
+    return new ThriftScanClientHandler(this, writeTracker);
+  }
+
+  /**
+   * Start the thrift service to handle incoming client requests
+   *
+   * @return address of this client service
+   * @throws UnknownHostException
+   *           host unknown
+   */
+  protected ServerAddress startScanServerClientService() throws 
UnknownHostException {
+
+    // This class implements TabletClientService.Iface and then delegates 
calls. Be sure
+    // to set up the ThriftProcessor using this class, not the delegate.
+    TProcessor processor = ThriftProcessorTypes.getScanServerTProcessor(this, 
getContext());
+
+    Property maxMessageSizeProperty =
+        (getConfiguration().get(Property.SSERV_MAX_MESSAGE_SIZE) != null
+            ? Property.SSERV_MAX_MESSAGE_SIZE : 
Property.GENERAL_MAX_MESSAGE_SIZE);
+    ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
+        Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(),
+        "Thrift Client Server", Property.SSERV_PORTSEARCH, 
Property.SSERV_MINTHREADS,
+        Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK, 
maxMessageSizeProperty);
+
+    LOG.info("address = {}", sp.address);
+    return sp;
+  }
+
+  public String getClientAddressString() {
+    if (clientAddress == null) {
+      return null;
+    }
+    return clientAddress.getHost() + ":" + clientAddress.getPort();
+  }
+
+  /**
+   * Set up nodes and locks in ZooKeeper for this Compactor
+   */
+  private ServiceLock announceExistence() {
+    ZooReaderWriter zoo = getContext().getZooReaderWriter();
+    try {
+
+      var zLockPath = ServiceLock.path(
+          getContext().getZooKeeperRoot() + Constants.ZSSERVERS + "/" + 
getClientAddressString());
+
+      try {
+        // Old zk nodes can be cleaned up by ZooZap
+        zoo.putPersistentData(zLockPath.toString(), new byte[] {}, 
NodeExistsPolicy.SKIP);
+      } catch (KeeperException e) {
+        if (e.code() == KeeperException.Code.NOAUTH) {
+          LOG.error("Failed to write to ZooKeeper. Ensure that"
+              + " accumulo.properties, specifically instance.secret, is 
consistent.");
+        }
+        throw e;
+      }
+
+      serverLockUUID = UUID.randomUUID();
+      scanServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, 
serverLockUUID);
+
+      LockWatcher lw = new LockWatcher() {
+
+        @Override
+        public void lostLock(final LockLossReason reason) {
+          Halt.halt(serverStopRequested ? 0 : 1, () -> {
+            if (!serverStopRequested) {
+              LOG.error("Lost tablet server lock (reason = {}), exiting.", 
reason);
+            }
+            gcLogger.logGCInfo(getConfiguration());
+          });
+        }
+
+        @Override
+        public void unableToMonitorLockNode(final Exception e) {
+          Halt.halt(1, () -> LOG.error("Lost ability to monitor scan server 
lock, exiting.", e));
+        }
+      };
+
+      // Don't use the normal ServerServices lock content, instead put the 
server UUID here.
+      byte[] lockContent = (serverLockUUID.toString() + "," + 
groupName).getBytes(UTF_8);
+
+      for (int i = 0; i < 120 / 5; i++) {
+        zoo.putPersistentData(zLockPath.toString(), new byte[0], 
NodeExistsPolicy.SKIP);
+
+        if (scanServerLock.tryLock(lw, lockContent)) {
+          LOG.debug("Obtained scan server lock {}", 
scanServerLock.getLockPath());
+          return scanServerLock;
+        }
+        LOG.info("Waiting for scan server lock");
+        sleepUninterruptibly(5, TimeUnit.SECONDS);
+      }
+      String msg = "Too many retries, exiting.";
+      LOG.info(msg);
+      throw new RuntimeException(msg);
+    } catch (Exception e) {
+      LOG.info("Could not obtain scan server lock, exiting.", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void run() {
+    SecurityUtil.serverLogin(getConfiguration());
+
+    ServerAddress address = null;
+    try {
+      address = startScanServerClientService();
+      clientAddress = address.getAddress();
+    } catch (UnknownHostException e1) {
+      throw new RuntimeException("Failed to start the compactor client 
service", e1);
+    }
+
+    try {
+      MetricsUtil.initializeMetrics(getContext().getConfiguration(), 
this.applicationName,
+          clientAddress);
+    } catch (Exception e1) {
+      LOG.error("Error initializing metrics, metrics will not be emitted.", 
e1);

Review Comment:
   MetricsUtil.initializeMetrics throws `Exception`. I will create another 
issue to fix that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to