This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7da6d051e0 [IOTDB-4519] Fix existing measurement check during auto
creating schema (#7428)
7da6d051e0 is described below
commit 7da6d051e0e67bdcb688665a3b9433615cc205ad
Author: Marcos_Zyk <[email protected]>
AuthorDate: Mon Sep 26 14:18:27 2022 +0800
[IOTDB-4519] Fix existing measurement check during auto creating schema
(#7428)
---
.../iotdb/db/metadata/path/MeasurementPath.java | 25 ++++++++++++
.../metadata/visitor/SchemaExecutionVisitor.java | 17 +-------
.../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 6 +--
.../db/metadata/path/MeasurementPathTest.java | 45 ++++++++++++++++++++++
4 files changed, 73 insertions(+), 20 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index 2525844e31..c6a9149886 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -31,9 +31,11 @@ import
org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
public class MeasurementPath extends PartialPath {
@@ -228,4 +230,27 @@ public class MeasurementPath extends PartialPath {
public PartialPath transformToPartialPath() {
return getDevicePath().concatNode(getTailNode());
}
+
+ /**
+ * In specific scenarios, like internal create timeseries, the message can
only be passed as
+ * String format.
+ */
+ public static String transformDataToString(MeasurementPath measurementPath) {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
+ try {
+ measurementPath.serialize(dataOutputStream);
+ } catch (IOException ignored) {
+ // this exception won't happen.
+ }
+ byte[] bytes = byteArrayOutputStream.toByteArray();
+ // must use single-byte char sets
+ return new String(bytes, StandardCharsets.ISO_8859_1);
+ }
+
+ public static MeasurementPath parseDataFromString(String
measurementPathData) {
+ return (MeasurementPath)
+ PathDeserializeUtil.deserialize(
+
ByteBuffer.wrap(measurementPathData.getBytes(StandardCharsets.ISO_8859_1)));
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
index d44af00add..69f9a9d968 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
@@ -54,8 +54,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -182,7 +180,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
logger.info("There's no need to internal create timeseries. {}",
e.getMessage());
alreadyExistingTimeseries.add(
RpcUtils.getStatus(
- e.getErrorCode(),
transformExistingTimeseriesToString(e.getMeasurementPath())));
+ e.getErrorCode(),
MeasurementPath.transformDataToString(e.getMeasurementPath())));
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
@@ -190,17 +188,6 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
}
}
- private String transformExistingTimeseriesToString(MeasurementPath
measurementPath) {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
- try {
- measurementPath.serialize(dataOutputStream);
- } catch (IOException ignored) {
- // this exception won't happen.
- }
- return byteArrayOutputStream.toString();
- }
-
private void executeInternalCreateAlignedTimeseries(
PartialPath devicePath,
MeasurementGroup measurementGroup,
@@ -233,7 +220,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
MeasurementPath measurementPath = e.getMeasurementPath();
alreadyExistingTimeseries.add(
RpcUtils.getStatus(
- e.getErrorCode(),
transformExistingTimeseriesToString(measurementPath)));
+ e.getErrorCode(),
MeasurementPath.transformDataToString(e.getMeasurementPath())));
// remove the existing timeseries from plan
int index = measurementList.indexOf(measurementPath.getMeasurement());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index d80ef21db7..96d104e08c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.ITemplateManager;
import org.apache.iotdb.db.metadata.template.Template;
@@ -58,7 +57,6 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -476,9 +474,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher
{
for (TSStatus subStatus : executionResult.status.subStatus) {
if (subStatus.code ==
TSStatusCode.MEASUREMENT_ALREADY_EXIST.getStatusCode()) {
alreadyExistingMeasurements.add(
- (MeasurementPath)
- PathDeserializeUtil.deserialize(
- ByteBuffer.wrap(subStatus.getMessage().getBytes())));
+ MeasurementPath.parseDataFromString(subStatus.getMessage()));
} else {
failedCreationList.add(subStatus.message);
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
b/server/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
new file mode 100644
index 0000000000..9ee6588fbb
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.path;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MeasurementPathTest {
+
+ @Test
+ public void testTransformDataToString() throws IllegalPathException {
+ MeasurementPath rawPath =
+ new MeasurementPath(
+ new PartialPath("root.sg.d.s"), new MeasurementSchema("s",
TSDataType.INT32), true);
+ rawPath.setMeasurementAlias("alias");
+ String string = MeasurementPath.transformDataToString(rawPath);
+ MeasurementPath newPath = MeasurementPath.parseDataFromString(string);
+ Assert.assertEquals(rawPath.getFullPath(), newPath.getFullPath());
+ Assert.assertEquals(rawPath.getMeasurementAlias(),
newPath.getMeasurementAlias());
+ Assert.assertEquals(rawPath.getMeasurementSchema(),
newPath.getMeasurementSchema());
+ Assert.assertEquals(rawPath.isUnderAlignedEntity(),
newPath.isUnderAlignedEntity());
+ }
+}