This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 3674dce2817823fa154628cb9a96d2046d2ad368 Merge: ccd433de05 85b7d3be9f Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Tue Oct 17 17:38:57 2023 +0000 Merge branch 'main' into elasticity .../accumulo/tserver/TabletClientHandler.java | 3 + .../accumulo/tserver/tablet/ScanDataSource.java | 108 +++++++++++++-------- .../accumulo/tserver/tablet/ScanfileManager.java | 55 ++++++----- 3 files changed, 100 insertions(+), 66 deletions(-) diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java index 46cd47b4fd,0000000000..e53bbf2553 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java @@@ -1,139 -1,0 +1,146 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.util.MapCounter; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ScanfileManager { + private final Logger log = LoggerFactory.getLogger(ScanfileManager.class); + private final Tablet tablet; + + ScanfileManager(Tablet tablet) { + this.tablet = tablet; + } + + private final Set<StoredTabletFile> filesToDeleteAfterScan = new HashSet<>(); + private final Map<Long,Set<StoredTabletFile>> scanFileReservations = new HashMap<>(); + private final MapCounter<StoredTabletFile> fileScanReferenceCounts = new MapCounter<>(); + private long nextScanReservationId = 0; + + static void rename(VolumeManager fs, Path src, Path dst) throws IOException { + if (!fs.rename(src, dst)) { + throw new IOException("Rename " + src + " to " + dst + " returned false "); + } + } + + Pair<Long,Map<StoredTabletFile,DataFileValue>> reserveFilesForScan() { + synchronized (tablet) { + + var tabletsFiles = tablet.getDatafiles(); + Set<StoredTabletFile> absFilePaths = new HashSet<>(tabletsFiles.keySet()); + + long rid = nextScanReservationId++; + + scanFileReservations.put(rid, absFilePaths); + + Map<StoredTabletFile,DataFileValue> ret = new HashMap<>(); + + for (StoredTabletFile path : absFilePaths) { + fileScanReferenceCounts.increment(path, 1); + ret.put(path, tabletsFiles.get(path)); + } + + return new Pair<>(rid, ret); + } + } + + void returnFilesForScan(Long reservationId) { + + final Set<StoredTabletFile> filesToDelete = new HashSet<>(); + - synchronized (tablet) { - Set<StoredTabletFile> absFilePaths = scanFileReservations.remove(reservationId); ++ try { ++ synchronized (tablet) { ++ Set<StoredTabletFile> absFilePaths = scanFileReservations.remove(reservationId); + - if (absFilePaths == null) { - throw new IllegalArgumentException("Unknown scan reservation id " + reservationId); - } ++ if (absFilePaths == null) { ++ throw new IllegalArgumentException("Unknown scan reservation id " + reservationId); ++ } + - boolean notify = false; - for (StoredTabletFile path : absFilePaths) { - long refCount = fileScanReferenceCounts.decrement(path, 1); - if (refCount == 0) { - if (filesToDeleteAfterScan.remove(path)) { - filesToDelete.add(path); ++ boolean notify = false; ++ try { ++ for (StoredTabletFile path : absFilePaths) { ++ long refCount = fileScanReferenceCounts.decrement(path, 1); ++ if (refCount == 0) { ++ if (filesToDeleteAfterScan.remove(path)) { ++ filesToDelete.add(path); ++ } ++ notify = true; ++ } else if (refCount < 0) { ++ throw new IllegalStateException("Scan ref count for " + path + " is " + refCount); ++ } ++ } ++ } finally { ++ if (notify) { ++ tablet.notifyAll(); + } - notify = true; - } else if (refCount < 0) { - throw new IllegalStateException("Scan ref count for " + path + " is " + refCount); + } + } - - if (notify) { - tablet.notifyAll(); ++ } finally { ++ if (!filesToDelete.isEmpty()) { ++ // Remove scan files even if the loop above did not fully complete because once a ++ // file is in the set filesToDelete that means it was removed from filesToDeleteAfterScan ++ // and would never be added back. ++ log.debug("Removing scan refs from metadata {} {}", tablet.getExtent(), filesToDelete); ++ // ELASTICTIY_TODO use conditional mutation ++ MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, tablet.getContext(), ++ tablet.getTabletServer().getLock()); + } + } - - if (!filesToDelete.isEmpty()) { - log.debug("Removing scan refs from metadata {} {}", tablet.getExtent(), filesToDelete); - // ELASTICTIY_TODO use conditional mutation - MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, tablet.getContext(), - tablet.getTabletServer().getLock()); - } + } + + void removeFilesAfterScan(Collection<StoredTabletFile> scanFiles) { + if (scanFiles.isEmpty()) { + return; + } + + Set<StoredTabletFile> filesToDelete = new HashSet<>(); + + synchronized (tablet) { + for (StoredTabletFile path : scanFiles) { + if (fileScanReferenceCounts.get(path) == 0) { + filesToDelete.add(path); + } else { + filesToDeleteAfterScan.add(path); + } + } + } + + if (!filesToDelete.isEmpty()) { + // ELASTICTIY_TODO use conditional mutation and require the tablet location + log.debug("Removing scan refs from metadata {} {}", tablet.getExtent(), filesToDelete); + MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, tablet.getContext(), + tablet.getTabletServer().getLock()); + } + } +}