This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 25ecfcc1c7f4d6c5d73c1043a212206188cb9d5a Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Jul 14 18:36:01 2025 +0800 [To dev/1.3] Pipe: Add a tool for validating and repairing isGeneratedByPipe mark in tsfile resources (#15934) --- .../tools/tsfile/mark-is-generated-by-pipe.bat | 59 +++++ .../tools/tsfile/mark-is-generated-by-pipe.sh | 51 ++++ ...GeneratedByPipeMarkValidationAndRepairTool.java | 263 +++++++++++++++++++++ 3 files changed, 373 insertions(+) diff --git a/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.bat b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.bat new file mode 100644 index 00000000000..7d2b867bba9 --- /dev/null +++ b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.bat @@ -0,0 +1,59 @@ +@REM +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM + +@echo off +echo ```````````````````````````````````````````````````````````````````````` +echo Starting Validating the isGeneratedByPipe Mark in TsFile Resources +echo ```````````````````````````````````````````````````````````````````````` + +if "%OS%" == "Windows_NT" setlocal + +pushd %~dp0..\.. +if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD% +popd + +if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.tools.validate.TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool +if NOT DEFINED JAVA_HOME goto :err + +@REM ----------------------------------------------------------------------------- +@REM ***** CLASSPATH library setting ***** +@REM Ensure that any user defined CLASSPATH variables are not used on startup +set CLASSPATH="%IOTDB_HOME%\lib\*" + +goto okClasspath + +:append +set CLASSPATH=%CLASSPATH%;%1 +goto :eof + +@REM ----------------------------------------------------------------------------- +:okClasspath + +"%JAVA_HOME%\bin\java" -cp "%CLASSPATH%" %MAIN_CLASS% %* + +goto finally + +:err +echo JAVA_HOME environment variable must be set! +pause + +@REM ----------------------------------------------------------------------------- +:finally + +ENDLOCAL \ No newline at end of file diff --git a/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.sh b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.sh new file mode 100644 index 00000000000..daabae671f3 --- /dev/null +++ b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.sh @@ -0,0 +1,51 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +echo ------------------------------------------------------------------------------------ +echo Starting Validating the isGeneratedByPipe Mark in TsFile Resources +echo ------------------------------------------------------------------------------------ + +source "$(dirname "$0")/../../sbin/iotdb-common.sh" +#get_iotdb_include and checkAllVariables is in iotdb-common.sh +VARS=$(get_iotdb_include "$*") +checkAllVariables +export IOTDB_HOME="${IOTDB_HOME}/.." +eval set -- "$VARS" + +if [ -n "$JAVA_HOME" ]; then + for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do + if [ -x "$java" ]; then + JAVA="$java" + break + fi + done +else + JAVA=java +fi + +CLASSPATH="" +for f in ${IOTDB_HOME}/lib/*.jar; do + CLASSPATH=${CLASSPATH}":"$f +done + +MAIN_CLASS=org.apache.iotdb.db.tools.validate.TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool + +"$JAVA" -cp "$CLASSPATH" "$MAIN_CLASS" "$@" +exit $? diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java new file mode 100644 index 00000000000..e14f7e81d05 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.tools.validate; + +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.apache.tsfile.common.constant.TsFileConstant; +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool { + + private static final Logger LOGGER = + org.slf4j.LoggerFactory.getLogger( + TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.class); + + private static final String USAGE = + "Usage: --expected true|false --dirs <dir1> <dir2> ...\n" + + " --expected: whether the TsFileResource is expected to be generated by pipe\n" + + " --dirs: list of data directories to validate and repair"; + + private static final Set<File> dataDirs = new ConcurrentSkipListSet<>(); + private static final AtomicBoolean expectedMark = new AtomicBoolean(true); + + private static final AtomicLong runtime = new AtomicLong(System.currentTimeMillis()); + + private static final AtomicInteger totalTsFileNum = new AtomicInteger(0); + private static final AtomicInteger toRepairTsFileNum = new AtomicInteger(0); + + // Usage: --expected true|false --dirs <dir1> <dir2> ... + public static void main(String[] args) throws IOException { + parseCommandLineArgs(args); + final List<File> partitionDirs = findAllPartitionDirs(); + partitionDirs.parallelStream() + .forEach( + TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool + ::validateAndRepairTsFileResourcesInPartition); + printStatistics(); + } + + private static void parseCommandLineArgs(final String[] args) { + final Set<String> argSet = + new ConcurrentSkipListSet<>( + args.length > 0 ? Arrays.asList(args) : Collections.emptyList()); + if (args.length == 0 + || argSet.contains("--help") + || argSet.contains("-h") + || !(argSet.contains("--expected") && argSet.contains("--dirs"))) { + LOGGER.info(USAGE); + System.exit(1); + } + + for (int i = 0; i < args.length; i++) { + if ("--expected".equals(args[i]) && i + 1 < args.length) { + expectedMark.set(Boolean.parseBoolean(args[++i])); + } else if ("--dirs".equals(args[i]) && i + 1 < args.length) { + i++; + while (i < args.length && !args[i].startsWith("--")) { + dataDirs.add(new File(args[i++])); + } + i--; + } else { + LOGGER.info("Unknown argument: {}", args[i]); + LOGGER.info(USAGE); + // Exit if an unknown argument is encountered + System.exit(1); + } + } + + if (dataDirs.isEmpty()) { + LOGGER.info("No data directories provided. Please specify with --dirs <dir1> <dir2> ..."); + System.exit(1); + } + + LOGGER.info("------------------------------------------------------"); + LOGGER.info("Expected mark: {}", expectedMark.get()); + LOGGER.info("Data directories: "); + for (File dir : dataDirs) { + LOGGER.info(" {}", dir.getAbsolutePath()); + } + LOGGER.info("------------------------------------------------------"); + } + + private static List<File> findAllPartitionDirs() { + final List<File> partitionDirs = new ArrayList<>(); + for (final File dataDir : dataDirs) { + if (dataDir.exists() && dataDir.isDirectory()) { + partitionDirs.addAll(findLeafDirectories(dataDir)); + } + } + return partitionDirs; + } + + public static List<File> findLeafDirectories(File dir) { + List<File> leafDirectories = new ArrayList<>(); + + File[] files = dir.listFiles(); + + if (files == null || files.length == 0) { + leafDirectories.add(dir); + return leafDirectories; + } + + for (File file : files) { + if (file.isDirectory()) { + leafDirectories.addAll(findLeafDirectories(file)); + } + } + + if (leafDirectories.isEmpty()) { + leafDirectories.add(dir); + } + + return leafDirectories; + } + + private static void validateAndRepairTsFileResourcesInPartition(final File partitionDir) { + final AtomicInteger totalResources = new AtomicInteger(); + final AtomicInteger toRepairResources = new AtomicInteger(); + + try { + final List<TsFileResource> resources = + loadAllTsFileResources(Collections.singletonList(partitionDir)); + totalResources.addAndGet(resources.size()); + + for (final TsFileResource resource : resources) { + try { + if (validateAndRepairSingleTsFileResource(resource)) { + toRepairResources.incrementAndGet(); + } + } catch (final Exception e) { + // Continue processing other resources even if one fails + LOGGER.warn( + "Error validating or repairing resource {}: {}", + resource.getTsFile().getAbsolutePath(), + e.getMessage(), + e); + } + } + } catch (final Exception e) { + LOGGER.warn( + "Error loading resources from partition {}: {}", + partitionDir.getAbsolutePath(), + e.getMessage(), + e); + } + + totalTsFileNum.addAndGet(totalResources.get()); + toRepairTsFileNum.addAndGet(toRepairResources.get()); + LOGGER.info( + "TimePartition {} has {} total resources, {} to repair resources. Process completed.", + partitionDir, + totalResources.get(), + toRepairResources.get()); + } + + private static List<TsFileResource> loadAllTsFileResources(List<File> timePartitionDirs) + throws IOException { + final List<TsFileResource> resources = new ArrayList<>(); + + for (final File timePartitionDir : timePartitionDirs) { + for (final File tsfile : Objects.requireNonNull(timePartitionDir.listFiles())) { + final String filePath = tsfile.getAbsolutePath(); + if (!filePath.endsWith(TsFileConstant.TSFILE_SUFFIX) || !tsfile.isFile()) { + continue; + } + String resourcePath = tsfile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX; + + if (!new File(resourcePath).exists()) { + LOGGER.info( + "{} is skipped because resource file is not exist.", tsfile.getAbsolutePath()); + continue; + } + + TsFileResource resource = new TsFileResource(tsfile); + resource.deserialize(); + resource.close(); + resources.add(resource); + } + } + + return resources; + } + + /** + * Validates and repairs a single TsFileResource. + * + * @param resource the TsFileResource to validate and repair + * @return true if the resource needs to be repaired and false if it is valid + */ + private static boolean validateAndRepairSingleTsFileResource(TsFileResource resource) { + if (resource.isGeneratedByPipe() == expectedMark.get()) { + // The resource is valid, no need to repair + return false; + } + + LOGGER.info( + "Repairing TsFileResource: {}, expected mark: {}, actual mark: {}", + resource.getTsFile().getAbsolutePath(), + expectedMark.get(), + resource.isGeneratedByPipe()); + + try { + repairSingleTsFileResource(resource); + + LOGGER.info( + "Marked TsFileResource as {} in resource: {}", + expectedMark.get(), + resource.getTsFile().getAbsolutePath()); + } catch (final Exception e) { + LOGGER.warn( + "ERROR: Failed to repair TsFileResource: {}, error: {}", + resource.getTsFile().getAbsolutePath(), + e.getMessage()); + } + + return true; + } + + private static void repairSingleTsFileResource(TsFileResource resource) throws IOException { + resource.setGeneratedByPipe(expectedMark.get()); + resource.serialize(); + } + + private static void printStatistics() { + LOGGER.info("------------------------------------------------------"); + LOGGER.info("Validation and repair completed. Statistics:"); + LOGGER.info( + "Total time taken: {} ms, total TsFile resources: {}, repaired TsFile resources: {}", + System.currentTimeMillis() - runtime.get(), + totalTsFileNum.get(), + toRepairTsFileNum.get()); + } +}
