This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.8 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4cf2009307c1a7a27e026425339f3021049e2735 Author: Zhenyu Luo <[email protected]> AuthorDate: Wed Feb 4 09:35:57 2026 +0800 Load: Support same measurement name and type with different encoding and compression (#17152) * Support same measurement name and type with different encoding and compression This PR allows measurements with the same name and type to have different encoding and compression types during TsFile loading. Previously, duplicate measurements would throw an exception. Now, if measurements have the same name and type but different encoding/compression, they will be deduplicated by keeping only one schema entry. Changes: - Add updateDevice2TimeSeries method in LoadTsFileTreeSchemaCache to support updating the device-to-timeseries mapping - Modify makeSureNoDuplicatedMeasurementsInDevices in TreeSchemaAutoCreatorAndVerifier to allow same measurement with different encoding/compression by deduplicating instead of throwing an exception - Only throw exception when duplicate measurements have different data types * fix * fix * add IT * spotless --- .../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 53 ++++++++++++++++++++++ .../load/TreeSchemaAutoCreatorAndVerifier.java | 32 +++++++++++-- 2 files changed, 81 insertions(+), 4 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java index 928c7875b19..19a4214bed2 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java @@ -1058,6 +1058,59 @@ public class IoTDBLoadTsFileIT { } } + @Test + public void testLoadWithSameMeasurementNameDifferentDevice() throws Exception { + final String device = "root.sg.test_0.device_1"; + MeasurementSchema measurement = + new MeasurementSchema("temperature", TSDataType.DOUBLE, TSEncoding.GORILLA); + + final long writtenPoint1; + try (final TsFileGenerator generator = + new TsFileGenerator(new File(tmpDir, "same-measurement-1.tsfile"))) { + generator.registerTimeseries(device, Collections.singletonList(measurement)); + generator.generateData(device, 1000, PARTITION_INTERVAL, false); + writtenPoint1 = generator.getTotalNumber(); + } + + measurement = new MeasurementSchema("temperature", TSDataType.DOUBLE, TSEncoding.PLAIN); + final long writtenPoint2; + try (final TsFileGenerator generator = + new TsFileGenerator(new File(tmpDir, "same-measurement-2.tsfile"))) { + generator.registerTimeseries(device, Collections.singletonList(measurement)); + generator.generateData(device, 2000, PARTITION_INTERVAL / 10000, false); + writtenPoint2 = generator.getTotalNumber(); + } + + try (final Connection connection = EnvFactory.getEnv().getConnection(); + final Statement statement = connection.createStatement()) { + + statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath())); + + try (final ResultSet resultSet = statement.executeQuery("select count(**) from root.sg.**")) { + if (resultSet.next()) { + final long sg1Count = resultSet.getLong("count(root.sg.test_0.device_1.temperature)"); + Assert.assertEquals(writtenPoint1 + writtenPoint2, sg1Count); + } else { + Assert.fail("This ResultSet is empty."); + } + } + + try (final ResultSet resultSet = statement.executeQuery("show timeseries root.sg.**")) { + int count = 0; + Set<String> expectedPaths = new HashSet<>(); + expectedPaths.add(device + "." + measurement.getMeasurementName()); + while (resultSet.next()) { + String path = resultSet.getString(ColumnHeaderConstant.TIMESERIES); + Assert.assertTrue("Unexpected timeseries path: " + path, expectedPaths.contains(path)); + expectedPaths.remove(path); + count++; + } + Assert.assertEquals(1, count); + Assert.assertTrue("Not all expected timeseries found", expectedPaths.isEmpty()); + } + } + } + @Test @Ignore("Load with conversion is currently banned") public void testLoadWithConvertOnTypeMismatchForTreeModel() throws Exception { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java index c174bd7942b..29d4f1be07b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java @@ -252,17 +252,41 @@ public class TreeSchemaAutoCreatorAndVerifier { } private void makeSureNoDuplicatedMeasurementsInDevices() throws LoadAnalyzeException { + boolean hasDuplicates = false; + final Map<IDeviceID, Set<MeasurementSchema>> deduplicatedDevice2TimeSeries = new HashMap<>(); + for (final Map.Entry<IDeviceID, Set<MeasurementSchema>> entry : schemaCache.getDevice2TimeSeries().entrySet()) { final IDeviceID device = entry.getKey(); final Map<String, MeasurementSchema> measurement2Schema = new HashMap<>(); + boolean deviceHasDuplicates = false; + for (final MeasurementSchema timeseriesSchema : entry.getValue()) { final String measurement = timeseriesSchema.getMeasurementName(); - if (measurement2Schema.containsKey(measurement)) { - throw new LoadAnalyzeException( - String.format("Duplicated measurements %s in device %s.", measurement, device)); + final MeasurementSchema existingSchema = measurement2Schema.get(measurement); + + if (existingSchema != null) { + if (existingSchema.getType() != timeseriesSchema.getType()) { + throw new LoadAnalyzeException( + String.format("Duplicated measurements %s in device %s.", measurement, device)); + } + deviceHasDuplicates = true; + hasDuplicates = true; + } else { + measurement2Schema.put(measurement, timeseriesSchema); } - measurement2Schema.put(measurement, timeseriesSchema); + } + + if (deviceHasDuplicates) { + deduplicatedDevice2TimeSeries.put(device, new HashSet<>(measurement2Schema.values())); + } + } + + if (hasDuplicates) { + Map<IDeviceID, Set<MeasurementSchema>> device2TimeSeries = schemaCache.getDevice2TimeSeries(); + for (final Map.Entry<IDeviceID, Set<MeasurementSchema>> entry : + deduplicatedDevice2TimeSeries.entrySet()) { + device2TimeSeries.put(entry.getKey(), new HashSet<>(entry.getValue())); } } }
