keith-turner commented on code in PR #5814:
URL: https://github.com/apache/accumulo/pull/5814#discussion_r2294831308


##########
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java:
##########
@@ -87,105 +99,485 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+//TODO when removing this class, also remove MetadataSchema.Upgrader12to13
 public class Upgrader11to12 implements Upgrader {
 
-  private static final Logger log = 
LoggerFactory.getLogger(Upgrader11to12.class);
+  interface MutationWriter {
+    void addMutation(Mutation m) throws MutationsRejectedException;
+  }
+
+  public enum ProblemType {
+    FILE_READ, FILE_WRITE, TABLET_LOAD
+  }
+
+  private static class ProblemReport {
+    private final TableId tableId;
+    private final ProblemType problemType;
+    private final String resource;
+    private String exception;
+    private String server;
+    private long creationTime;
+
+    private ProblemReport(TableId table, ProblemType problemType, String 
resource, byte[] enc) {
+      requireNonNull(table, "table is null");
+      requireNonNull(problemType, "problemType is null");
+      requireNonNull(resource, "resource is null");
+      this.tableId = table;
+      this.problemType = problemType;
+      this.resource = resource;
+
+      decode(enc);
+    }
+
+    private void decode(byte[] enc) {
+      try {
+        ByteArrayInputStream bais = new ByteArrayInputStream(enc);
+        DataInputStream dis = new DataInputStream(bais);
+
+        creationTime = dis.readLong();
+
+        if (dis.readBoolean()) {
+          server = dis.readUTF();
+        } else {
+          server = null;
+        }
+
+        if (dis.readBoolean()) {
+          exception = dis.readUTF();
+        } else {
+          exception = null;
+        }
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+
+    static ProblemReport decodeZooKeeperEntry(ServerContext context, String 
node)
+        throws IOException, KeeperException, InterruptedException {
+      byte[] bytes = Encoding.decodeBase64FileName(node);
+
+      ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+      DataInputStream dis = new DataInputStream(bais);
+
+      TableId tableId = TableId.of(dis.readUTF());
+      String problemType = dis.readUTF();
+      String resource = dis.readUTF();
+
+      String zpath = ZPROBLEMS + "/" + node;
+      byte[] enc = context.getZooSession().asReaderWriter().getData(zpath);
+
+      return new ProblemReport(tableId, ProblemType.valueOf(problemType), 
resource, enc);
+
+    }
+
+    public static ProblemReport decodeMetadataEntry(Key key, Value value) {
+      TableId tableId =
+          
TableId.of(key.getRow().toString().substring(ProblemSection.getRowPrefix().length()));
+      String problemType = key.getColumnFamily().toString();
+      String resource = key.getColumnQualifier().toString();
+
+      return new ProblemReport(tableId, ProblemType.valueOf(problemType), 
resource, value.get());
+    }
+  }
+
+  private static class ProblemSection {
+    private static final Section section =
+        new Section(RESERVED_PREFIX + "err_", true, RESERVED_PREFIX + "err`", 
false);
+
+    public static Range getRange() {
+      return section.getRange();
+    }
+
+    public static String getRowPrefix() {
+      return section.getRowPrefix();
+    }
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(Upgrader11to12.class);
+  private static final String ZPROBLEMS = "/problems";
+  private static final String ZTRACERS = "/tracers";
+  private static final String ZTABLE_COMPACT_ID = "/compact-id";
+  private static final String ZTABLE_COMPACT_CANCEL_ID = "/compact-cancel-id";
+  private static final String ZTABLE_STATE = "/state";
+  private static final byte[] ZERO_BYTE = {'0'};
 
   @SuppressWarnings("deprecation")
   private static final Text CHOPPED = ChoppedColumnFamily.NAME;
 
-  public static final Collection<Range> OLD_SCAN_SERVERS_RANGES =
-      List.of(new Range("~sserv", "~sserx"), new Range("~scanref", 
"~scanreg"));
+  @VisibleForTesting
+  static final String ZTABLE_NAME = "/name";
 
   @VisibleForTesting
   static final Set<Text> UPGRADE_FAMILIES = Set.of(DataFileColumnFamily.NAME, 
CHOPPED,
       ExternalCompactionColumnFamily.NAME, ScanFileColumnFamily.NAME);
 
-  private static final String ZTRACERS = "/tracers";
-
   @VisibleForTesting
   static final String ZNAMESPACE_NAME = "/name";
 
+  public static final Collection<Range> OLD_SCAN_SERVERS_RANGES =
+      List.of(new Range("~sserv", "~sserx"), new Range("~scanref", 
"~scanreg"));
+
   @Override
-  public void upgradeZookeeper(@NonNull ServerContext context) {
-    log.debug("Upgrade ZooKeeper: upgrading to data version {}", 
METADATA_FILE_JSON_ENCODING);
+  public void upgradeZookeeper(ServerContext context) {
+    LOG.info("Ensuring all worker server processes are down.");
+    validateEmptyZKWorkerServerPaths(context);
+    LOG.info("Removing ZTracer node");
+    removeZTracersNode(context);
+    LOG.info("Updating file references in root tablet");
+    updateRootTabletFileReferences(context);
+    LOG.info("Removing problems reports from zookeeper");
+    removeZKProblemReports(context);
+    LOG.info("Creating NamespaceMappings");
+    createNamespaceMappings(context);
+    LOG.info("Validating root and metadata compaction services");
+    validateCompactionServiceConfiguration(context);
+    LOG.info("Setting root table stored hosting availability");
+    addHostingGoals(context, TabletAvailability.HOSTED, DataLevel.ROOT);
+    LOG.info("Removing nodes no longer used from ZooKeeper");
+    removeUnusedZKNodes(context);
+    LOG.info("Removing compact columns from root tablet");
+    removeCompactColumnsFromRootTabletMetadata(context);
+    LOG.info("Adding compactions node to zookeeper");
+    addCompactionsNode(context);
+    LOG.info("Creating ZooKeeper entries for ScanServerRefTable");
+    initializeScanRefTable(context);
+    LOG.info("Creating ZooKeeper entries for accumulo.fate table");
+    initializeFateTable(context);
+    LOG.info("Adding table mappings to zookeeper");
+    addTableMappingsToZooKeeper(context);
+  }
 
+  @Override
+  public void upgradeRoot(ServerContext context) {
+    LOG.info("Updating file references in metadata tablets");
+    upgradeTabletsMetadata(context, Ample.DataLevel.METADATA.metaTable());
+    LOG.info("Looking for partial splits");
+    handlePartialSplits(context, SystemTables.ROOT.tableName());
+    LOG.info("setting metadata table hosting availability");
+    addHostingGoals(context, TabletAvailability.HOSTED, DataLevel.METADATA);
+    LOG.info("Removing MetadataBulkLoadFilter iterator from root table");
+    removeMetaDataBulkLoadFilter(context, SystemTables.ROOT.tableId());
+    LOG.info("Removing compact columns from metadata tablets");
+    removeCompactColumnsFromTable(context, SystemTables.ROOT.tableName());
+  }
+
+  @Override
+  public void upgradeMetadata(ServerContext context) {
+    LOG.info("Updating file references in user tablets");
+    upgradeTabletsMetadata(context, Ample.DataLevel.USER.metaTable());
+    LOG.info("Removing Scan Server Range from metadata table");
+    removeScanServerRanges(context);
+    LOG.info("Removing problems reports from metadata table");
+    removeMetadataProblemReports(context);
+    LOG.info("Creating table {}", SystemTables.SCAN_REF.tableName());
+    createScanRefTable(context);
+    LOG.info("Creating table {}", SystemTables.FATE.tableName());
+    createFateTable(context);
+    LOG.info("Looking for partial splits");
+    handlePartialSplits(context, SystemTables.METADATA.tableName());
+    LOG.info("setting hosting availability on user tables");
+    addHostingGoals(context, TabletAvailability.ONDEMAND, DataLevel.USER);
+    LOG.info("Deleting external compaction final states from user tables");
+    deleteExternalCompactionFinalStates(context);
+    LOG.info("Deleting external compaction from user tables");
+    deleteExternalCompactions(context);
+    LOG.info("Removing MetadataBulkLoadFilter iterator from metadata table");
+    removeMetaDataBulkLoadFilter(context, SystemTables.METADATA.tableId());
+    LOG.info("Removing compact columns from user tables");
+    removeCompactColumnsFromTable(context, SystemTables.METADATA.tableName());
+    LOG.info("Removing bulk file columns from metadata table");
+    removeBulkFileColumnsFromTable(context, SystemTables.METADATA.tableName());
+  }
+
+  private static void addCompactionsNode(ServerContext context) {
     try {
-      var zrw = context.getZooSession().asReaderWriter();
+      
context.getZooSession().asReaderWriter().putPersistentData(Constants.ZCOMPACTIONS,
+          new byte[0], ZooUtil.NodeExistsPolicy.SKIP);
+    } catch (KeeperException | InterruptedException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private void createScanRefTable(ServerContext context) {
+    try {
+      FileSystemInitializer initializer = new FileSystemInitializer(
+          new InitialConfiguration(context.getHadoopConf(), 
context.getSiteConfiguration()));
+      // For upgrading an existing system set to never merge. If the 
mergeability is changed
+      // then we would look to use the thrift client to look up the current 
Manager time to
+      // set as part of the mergeability metadata
+      FileSystemInitializer.InitialTablet scanRefTablet =
+          initializer.createScanRefTablet(context, 
TabletMergeabilityMetadata.never());
+      // Add references to the Metadata Table
+      try (BatchWriter writer = 
context.createBatchWriter(SystemTables.METADATA.tableName())) {
+        writer.addMutation(scanRefTablet.createMutation());
+      } catch (MutationsRejectedException | TableNotFoundException e) {
+        LOG.error("Failed to write tablet refs to metadata table");
+        throw new RuntimeException(e);
+      }
+    } catch (IOException e) {
+      LOG.error("Problem attempting to create ScanServerRef table", e);
+    }
+    LOG.info("Created ScanServerRef table");
+  }
 
-      // clean up nodes no longer in use
-      zrw.recursiveDelete(ZTRACERS, ZooUtil.NodeMissingPolicy.SKIP);
+  private void createFateTable(ServerContext context) {
+    try {
+      FileSystemInitializer initializer = new FileSystemInitializer(
+          new InitialConfiguration(context.getHadoopConf(), 
context.getSiteConfiguration()));
+      // For upgrading an existing system set to never merge. If the 
mergeability is changed
+      // then we would look to use the thrift client to look up the current 
Manager time to
+      // set as part of the mergeability metadata
+      FileSystemInitializer.InitialTablet fateTableTableTablet =
+          initializer.createFateRefTablet(context, 
TabletMergeabilityMetadata.never());
+      // Add references to the Metadata Table
+      try (BatchWriter writer = 
context.createBatchWriter(SystemTables.METADATA.tableName())) {
+        writer.addMutation(fateTableTableTablet.createMutation());
+      } catch (MutationsRejectedException | TableNotFoundException e) {
+        LOG.error("Failed to write tablet refs to metadata table");
+        throw new RuntimeException(e);
+      }
+    } catch (IOException e) {
+      LOG.error("Problem attempting to create Fate table", e);
+    }
+    LOG.info("Created Fate table");
+  }
+
+  private void removeCompactColumnsFromRootTabletMetadata(ServerContext 
context) {
 
+    try {
+      var zrw = context.getZooSession().asReaderWriter();
       Stat stat = new Stat();
       byte[] rootData = zrw.getData(ZROOT_TABLET, stat);
 
       String json = new String(rootData, UTF_8);
 
       var rtm = new RootTabletMetadata(json);
 
-      TreeMap<Key,Value> entries = new TreeMap<>();
-      rtm.getKeyValues().filter(e -> 
UPGRADE_FAMILIES.contains(e.getKey().getColumnFamily()))
-          .forEach(entry -> entries.put(entry.getKey(), entry.getValue()));
       ArrayList<Mutation> mutations = new ArrayList<>();
-
-      processReferences(mutations::add, entries.entrySet(), 
"root_table_metadata");
+      for (Map.Entry<Key,Value> entry : rtm.toKeyValues().entrySet()) {
+        var key = entry.getKey();
+
+        if (COMPACT_COL.hasColumns(key)) {
+          var row = key.getRow();
+          Preconditions.checkState(key.getColumnVisibilityData().length() == 0,
+              "Expected empty visibility, saw %s ", 
key.getColumnVisibilityData());
+          Mutation m = new Mutation(row);
+          // TODO will metadata contraint fail when this is written?
+          COMPACT_COL.putDelete(m);
+          mutations.add(m);
+        }
+      }
 
       Preconditions.checkState(mutations.size() <= 1);
 
       if (!mutations.isEmpty()) {
-        log.info("Root metadata in ZooKeeper before upgrade: {}", json);
+        LOG.info("Root metadata in ZooKeeper before upgrade: {}", json);
         rtm.update(mutations.get(0));
         zrw.overwritePersistentData(ZROOT_TABLET, 
rtm.toJson().getBytes(UTF_8), stat.getVersion());
-        log.info("Root metadata in ZooKeeper after upgrade: {}", rtm.toJson());
+        LOG.info("Root metadata in ZooKeeper after upgrade: {}", rtm.toJson());
       }
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException(
+          "Could not read root metadata from ZooKeeper due to interrupt", ex);
+    } catch (KeeperException ex) {
+      throw new IllegalStateException(
+          "Could not read or write root metadata in ZooKeeper because of 
ZooKeeper exception", ex);
+    }
 
-      byte[] namespacesData = zrw.getData(Constants.ZNAMESPACES);
-      if (namespacesData.length != 0) {
-        throw new IllegalStateException(
-            "Unexpected data found under namespaces node: " + new 
String(namespacesData, UTF_8));
+  }
+
+  private void removeCompactColumnsFromTable(ServerContext context, String 
tableName) {
+
+    try (var scanner = context.createScanner(tableName, Authorizations.EMPTY);
+        var writer = context.createBatchWriter(tableName)) {
+      scanner.setRange(MetadataSchema.TabletsSection.getRange());
+      COMPACT_COL.fetch(scanner);
+
+      for (Map.Entry<Key,Value> entry : scanner) {
+        var key = entry.getKey();
+        if (COMPACT_COL.hasColumns(key)) {
+          var row = key.getRow();
+          Preconditions.checkState(key.getColumnVisibilityData().length() == 0,
+              "Expected empty visibility, saw %s ", 
key.getColumnVisibilityData());
+          Mutation m = new Mutation(row);
+          COMPACT_COL.putDelete(m);
+          writer.addMutation(m);
+        }
       }
-      List<String> namespaceIdList = zrw.getChildren(Constants.ZNAMESPACES);
-      Map<String,String> namespaceMap = new HashMap<>();
-      for (String namespaceId : namespaceIdList) {
-        String namespaceNamePath = Constants.ZNAMESPACES + "/" + namespaceId + 
ZNAMESPACE_NAME;
-        namespaceMap.put(namespaceId, new 
String(zrw.getData(namespaceNamePath), UTF_8));
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private void removeBulkFileColumnsFromTable(ServerContext context, String 
tableName) {
+    // FATE transaction ids have changed from 3.x to 4.x which are used as the 
value for the bulk
+    // file column. FATE ops won't persist through upgrade, so these columns 
can be safely deleted
+    // if they exist.
+    try (var scanner = context.createScanner(tableName, Authorizations.EMPTY);
+        var writer = context.createBatchWriter(tableName)) {
+      scanner.setRange(MetadataSchema.TabletsSection.getRange());
+      scanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
+      for (Map.Entry<Key,Value> entry : scanner) {
+        var key = entry.getKey();
+        Mutation m = new Mutation(key.getRow());
+        Preconditions.checkState(
+            
key.getColumnFamily().equals(TabletsSection.BulkFileColumnFamily.NAME),
+            "Expected family %s, saw %s ", 
TabletsSection.BulkFileColumnFamily.NAME,
+            key.getColumnFamily());
+        Preconditions.checkState(key.getColumnVisibilityData().length() == 0,
+            "Expected empty visibility, saw %s ", 
key.getColumnVisibilityData());
+        m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+        writer.addMutation(m);
       }
-      byte[] mapping = NamespaceMapping.serializeMap(namespaceMap);
-      zrw.putPersistentData(Constants.ZNAMESPACES, mapping, 
ZooUtil.NodeExistsPolicy.OVERWRITE);
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
 
-      for (String namespaceId : namespaceIdList) {
-        String namespaceNamePath = Constants.ZNAMESPACES + "/" + namespaceId + 
ZNAMESPACE_NAME;
-        zrw.delete(namespaceNamePath);
+  private void removeUnusedZKNodes(ServerContext context) {
+    try {
+      final var zrw = context.getZooSession().asReaderWriter();
+
+      final String ZCOORDINATOR = "/coordinators";
+      final String BULK_ARBITRATOR_TYPE = "bulkTx";
+
+      zrw.recursiveDelete(ZCOORDINATOR, ZooUtil.NodeMissingPolicy.SKIP);
+      zrw.recursiveDelete("/" + BULK_ARBITRATOR_TYPE, 
ZooUtil.NodeMissingPolicy.SKIP);
+
+      final String ZTABLE_COMPACT_ID = "/compact-id";
+      final String ZTABLE_COMPACT_CANCEL_ID = "/compact-cancel-id";
+
+      for (String tId : zrw.getChildren(Constants.ZTABLES)) {
+        final String zTablePath = Constants.ZTABLES + "/" + tId;
+        zrw.delete(zTablePath + ZTABLE_COMPACT_ID);
+        zrw.delete(zTablePath + ZTABLE_COMPACT_CANCEL_ID);
       }
+    } catch (KeeperException | InterruptedException e1) {
+      throw new IllegalStateException(e1);
+    }
+  }
 
-      log.info("Removing problems reports from zookeeper");
-      removeZKProblemReports(context);
+  private void removeMetaDataBulkLoadFilter(ServerContext context, TableId 
tableId) {
+    final String propName = Property.TABLE_ITERATOR_PREFIX.getKey() + 
"majc.bulkLoadFilter";
+    PropUtil.removeProperties(context, TablePropKey.of(tableId), 
List.of(propName));
+  }
 
-      log.info("Creating ZooKeeper entries for ScanServerRefTable");
-      preparePre4_0NewTableState(context, SystemTables.SCAN_REF.tableId(), 
Namespace.ACCUMULO.id(),
-          SystemTables.SCAN_REF.tableName(), TableState.ONLINE, 
ZooUtil.NodeExistsPolicy.FAIL);
+  private void deleteExternalCompactionFinalStates(ServerContext context) {
+    // This metadata was only written for user tablets as part of the 
compaction commit process.
+    // Compactions are committed in a completely different way now, so delete 
these entries. Its
+    // possible some completed compactions may need to be redone, but 
processing these entries would
+    // not be easy to test so its better for correctness to delete them and 
redo the work.
+    try (
+        var scanner =
+            context.createScanner(SystemTables.METADATA.tableName(), 
Authorizations.EMPTY);
+        var writer = 
context.createBatchWriter(SystemTables.METADATA.tableName())) {
+      var section = new Section(RESERVED_PREFIX + "ecomp", true, 
RESERVED_PREFIX + "ecomq", false);
+      scanner.setRange(section.getRange());
 
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-      throw new IllegalStateException(
-          "Could not read root metadata from ZooKeeper due to interrupt", ex);
-    } catch (KeeperException ex) {
-      throw new IllegalStateException(
-          "Could not read or write root metadata in ZooKeeper because of 
ZooKeeper exception", ex);
+      for (Map.Entry<Key,Value> entry : scanner) {
+        var key = entry.getKey();
+        var row = key.getRow();
+        
Preconditions.checkState(row.toString().startsWith(section.getRowPrefix()));
+        Mutation m = new Mutation(row);
+        Preconditions.checkState(key.getColumnVisibilityData().length() == 0,
+            "Expected empty visibility, saw %s ", 
key.getColumnVisibilityData());
+        m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+        writer.addMutation(m);
+      }
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
     }
   }
 
-  private static final String ZTABLE_NAME = "/name";
-  private static final String ZTABLE_COMPACT_ID = "/compact-id";
-  private static final String ZTABLE_COMPACT_CANCEL_ID = "/compact-cancel-id";
-  private static final String ZTABLE_STATE = "/state";
-  private static final byte[] ZERO_BYTE = {'0'};
+  private void addHostingGoals(ServerContext context, TabletAvailability 
availability,
+      DataLevel level) {
+    try (
+        TabletsMetadata tm =
+            
context.getAmple().readTablets().forLevel(level).fetch(ColumnType.PREV_ROW).build();
+        TabletsMutator mut = context.getAmple().mutateTablets()) {
+      tm.forEach(t -> 
mut.mutateTablet(t.getExtent()).putTabletAvailability(availability).mutate());
+    }
+  }
+
+  private void deleteExternalCompactions(ServerContext context) {
+    // External compactions were only written for user tablets in 3.x and 
earlier, so only need to
+    // process the metadata table. The metadata related to an external 
compaction has changed so
+    // delete any that exists. Not using Ample in case there are problems 
deserializing the old
+    // external compaction metadata.
+    try (
+        var scanner =
+            context.createScanner(SystemTables.METADATA.tableName(), 
Authorizations.EMPTY);
+        var writer = 
context.createBatchWriter(SystemTables.METADATA.tableName())) {
+      scanner.setRange(TabletsSection.getRange());
+      scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+
+      for (Map.Entry<Key,Value> entry : scanner) {
+        var key = entry.getKey();
+        Mutation m = new Mutation(key.getRow());
+        
Preconditions.checkState(key.getColumnFamily().equals(ExternalCompactionColumnFamily.NAME),
+            "Expected family %s, saw %s ", ExternalCompactionColumnFamily.NAME,
+            key.getColumnFamily());
+        Preconditions.checkState(key.getColumnVisibilityData().length() == 0,
+            "Expected empty visibility, saw %s ", 
key.getColumnVisibilityData());
+        m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+        writer.addMutation(m);
+      }
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private void handlePartialSplits(ServerContext context, String table) {
+    try (var scanner = context.createScanner(table, Authorizations.EMPTY)) {
+      scanner.setRange(TabletsSection.getRange());
+      TabletsSection.Upgrade12to13.SPLIT_RATIO_COLUMN.fetch(scanner);
+
+      for (var entry : scanner) {
+        SplitRecovery12to13.fixSplit(context, entry.getKey().getRow());

Review Comment:
   Can rename this class to SplitRecovery11to12



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to