This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-resource-pipe-mark in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2d8aa300bff5740c0561f495172f8045cad2125a Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Jul 14 15:36:36 2025 +0800 Create TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java --- ...GeneratedByPipeMarkValidationAndRepairTool.java | 286 +++++++++++++++++++++ 1 file changed, 286 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java new file mode 100644 index 00000000000..50d82bb5a0d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.tools.validate; + +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.apache.tsfile.common.constant.TsFileConstant; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool { + + private static final String USAGE = + "Usage: --expected true|false --dirs <dir1> <dir2> ...\n" + + " --expected: whether the TsFileResource is expected to be generated by pipe\n" + + " --dirs: list of data directories to validate and repair"; + + private static final Set<File> dataDirs = new ConcurrentSkipListSet<>(); + private static final AtomicBoolean expectedMark = new AtomicBoolean(true); + + private static final AtomicLong runtime = new AtomicLong(System.currentTimeMillis()); + + private static final AtomicInteger totalTsFileNum = new AtomicInteger(0); + private static final AtomicInteger toRepairTsFileNum = new AtomicInteger(0); + + // Usage: --expected true|false --dirs <dir1> <dir2> ... + // TODO: support validating and repairing specific time partition directories + public static void main(String[] args) throws IOException { + parseCommandLineArgs(args); + final Map<String, List<File>> partitionDirs = findAllPartitionDirs(); + partitionDirs.entrySet().parallelStream() + .forEach( + TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool + ::validateAndRepairTsFileResourcesInPartition); + printStatistics(); + } + + private static void parseCommandLineArgs(final String[] args) { + for (int i = 0; i < args.length; i++) { + if ("--expected".equals(args[i]) && i + 1 < args.length) { + expectedMark.set(Boolean.parseBoolean(args[++i])); + } else if ("--dirs".equals(args[i]) && i + 1 < args.length) { + i++; + while (i < args.length && !args[i].startsWith("--")) { + dataDirs.add(new File(args[i++])); + } + i--; + } else { + System.out.println("Unknown argument: " + args[i]); + System.out.println(USAGE); + // Exit if an unknown argument is encountered + System.exit(1); + } + } + + if (dataDirs.isEmpty()) { + System.out.println( + "No data directories provided. Please specify with --dirs <dir1> <dir2> ..."); + System.exit(1); + } + + System.out.println("Expected mark: " + expectedMark.get()); + System.out.println("Data directories: "); + for (File dir : dataDirs) { + System.out.println(" " + dir.getAbsolutePath()); + if (!dir.exists() || !dir.isDirectory()) { + System.out.println("Invalid directory: " + dir.getAbsolutePath()); + System.exit(1); + } + } + } + + private static Map<String, List<File>> findAllPartitionDirs() { + final Map<String, List<File>> partitionMap = new HashMap<>(); + + for (final File dataDir : dataDirs) { + if (!dataDir.exists() || !dataDir.isDirectory()) { + System.out.println(dataDir.getAbsolutePath() + " is not a valid directory"); + continue; + } + + for (final File seqOrUnseqDataDir : Objects.requireNonNull(dataDir.listFiles())) { + if (!(seqOrUnseqDataDir.isDirectory() + && (seqOrUnseqDataDir.getName().equals("sequence") + || seqOrUnseqDataDir.getName().equals("unsequence")))) { + System.out.println( + seqOrUnseqDataDir.getAbsolutePath() + " is not a sequence or unsequence directory"); + continue; + } + + for (final File sg : Objects.requireNonNull(seqOrUnseqDataDir.listFiles())) { + if (!sg.isDirectory()) { + System.out.println(sg.getAbsolutePath() + " is not a valid directory"); + continue; + } + + for (final File dataRegionDir : Objects.requireNonNull(sg.listFiles())) { + if (!dataRegionDir.isDirectory()) { + System.out.println(dataRegionDir.getAbsolutePath() + " is not a valid directory"); + continue; + } + + for (final File timePartitionDir : Objects.requireNonNull(dataRegionDir.listFiles())) { + if (!timePartitionDir.isDirectory()) { + System.out.println( + timePartitionDir.getAbsolutePath() + " is not a valid directory"); + continue; + } + + final String partitionKey = + calculateTimePartitionKey( + sg.getName(), dataRegionDir.getName(), timePartitionDir.getName()); + final List<File> partitionDirs = + partitionMap.computeIfAbsent(partitionKey, v -> new ArrayList<>()); + partitionDirs.add(timePartitionDir); + } + } + } + } + } + + return partitionMap; + } + + private static String calculateTimePartitionKey( + final String storageGroup, final String dataRegion, final String timePartition) { + return storageGroup + "-" + dataRegion + "-" + timePartition; + } + + private static void validateAndRepairTsFileResourcesInPartition( + final Map.Entry<String, List<File>> partitionEntry) { + final String partitionName = partitionEntry.getKey(); + final List<File> partitionDirs = partitionEntry.getValue(); + + final AtomicInteger totalResources = new AtomicInteger(); + final AtomicInteger toRepairResources = new AtomicInteger(); + + for (final File partitionDir : partitionDirs) { + try { + final List<TsFileResource> resources = + loadAllTsFileResources(Collections.singletonList(partitionDir)); + totalResources.addAndGet(resources.size()); + + for (final TsFileResource resource : resources) { + if (validateAndRepairSingleTsFileResource(resource)) { + toRepairResources.incrementAndGet(); + } + } + } catch (final Exception e) { + System.err.printf( + "Error loading resources from partition %s: %s%n", + partitionDir.getAbsolutePath(), e.getMessage()); + } + } + + totalTsFileNum.addAndGet(totalResources.get()); + toRepairTsFileNum.addAndGet(toRepairResources.get()); + System.out.printf( + "TimePartition %s has %d total resources, %d to repair resources. Process completed.\n", + partitionName, totalResources.get(), toRepairResources.get()); + } + + private static List<TsFileResource> loadAllTsFileResources(List<File> timePartitionDirs) + throws IOException { + final List<TsFileResource> resources = new ArrayList<>(); + + for (File timePartitionDir : timePartitionDirs) { + for (File tsfile : Objects.requireNonNull(timePartitionDir.listFiles())) { + String filePath = tsfile.getAbsolutePath(); + // has compaction log + if (filePath.endsWith(CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX) + || filePath.endsWith(CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX)) { + System.out.println( + "Time partition " + + timePartitionDir.getName() + + " is skipped because a compaction is not finished"); + return Collections.emptyList(); + } + + if (!filePath.endsWith(TsFileConstant.TSFILE_SUFFIX) || !tsfile.isFile()) { + continue; + } + String resourcePath = tsfile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX; + + if (!new File(resourcePath).exists()) { + System.out.println( + tsfile.getAbsolutePath() + " is skipped because resource file is not exist."); + continue; + } + + TsFileResource resource = new TsFileResource(tsfile); + resource.deserialize(); + resource.close(); + resources.add(resource); + } + } + + return resources; + } + + /** + * Validates and repairs a single TsFileResource. + * + * @param resource the TsFileResource to validate and repair + * @return true if the resource needs to be repaired and false if it is valid + */ + private static boolean validateAndRepairSingleTsFileResource(TsFileResource resource) { + if (resource.isGeneratedByPipe() == expectedMark.get()) { + // The resource is valid, no need to repair + return false; + } + + System.out.println( + "Repairing TsFileResource: " + + resource.getTsFile().getAbsolutePath() + + ", expected mark: " + + expectedMark.get() + + ", actual mark: " + + resource.isGeneratedByPipe()); + + try { + repairSingleTsFileResource(resource); + + System.out.println( + "Marked TsFileResource as" + + expectedMark.get() + + " in resource: " + + resource.getTsFile().getAbsolutePath()); + } catch (final Exception e) { + System.out.println( + "ERROR: Failed to repair TsFileResource: " + + resource.getTsFile().getAbsolutePath() + + ", error: " + + e.getMessage()); + } + + return true; + } + + private static void repairSingleTsFileResource(TsFileResource resource) throws IOException { + resource.setGeneratedByPipe(expectedMark.get()); + resource.serialize(); + } + + private static void printStatistics() { + System.out.println("\n------------------------------------------------------"); + System.out.println("Validation and repair completed. Statistics:"); + System.out.println( + "Total time taken: " + + (System.currentTimeMillis() - runtime.get()) + + " ms, total TsFile resources: " + + totalTsFileNum.get() + + ", to repair TsFile resources: " + + toRepairTsFileNum.get()); + } +}
