This is an automated email from the ASF dual-hosted git repository. zyk pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this push: new 0fc81c642e7 [Table Model] Support Table Device Management in SchemaRegion (#12694) 0fc81c642e7 is described below commit 0fc81c642e7fb97c9c24596e2e0e87df5cde6886 Author: Marcos_Zyk <38524330+marcos...@users.noreply.github.com> AuthorDate: Mon Jun 10 22:38:59 2024 +0800 [Table Model] Support Table Device Management in SchemaRegion (#12694) --- .../schemaengine/schemaregion/ISchemaRegion.java | 20 ++ .../attribute/DeviceAttributeStore.java | 177 +++++++++++ .../IDeviceAttributeStore.java} | 31 +- .../schemaregion/impl/SchemaRegionMemoryImpl.java | 63 +++- .../schemaregion/impl/SchemaRegionPBTreeImpl.java | 27 ++ .../mtree/impl/mem/MTreeBelowSGMemoryImpl.java | 215 +++++++++++++- .../mtree/impl/mem/mnode/basic/BasicMNode.java | 3 +- .../container/KeyNullableConcurrentHashMap.java | 175 +++++++++++ .../mem/mnode/container/MemMNodeContainer.java | 3 +- .../mtree/impl/mem/mnode/info/BasicMNodeInfo.java | 2 +- .../mtree/impl/mem/mnode/info/TableDeviceInfo.java | 119 ++++++++ .../impl/mem/snapshot/MemMTreeSnapshotUtil.java | 36 ++- .../read/req/impl/ShowTableDevicesPlan.java | 60 ++++ .../read/resp/info/IDeviceSchemaInfo.java | 4 + .../read/resp/info/impl/ShowDevicesResult.java | 31 ++ .../utils/filter/DeviceFilterVisitor.java | 19 ++ .../schemaRegion/SchemaRegionTableDeviceTest.java | 328 +++++++++++++++++++++ .../schemaRegion/SchemaRegionTestUtil.java | 67 +++++ .../org/apache/iotdb/commons/path/PartialPath.java | 5 +- .../apache/iotdb/commons/path/PathPatternUtil.java | 3 +- .../iotdb/commons/path/fa/nfa/SimpleNFA.java | 5 +- .../apache/iotdb/commons/schema/MemUsageUtil.java | 61 ++++ .../iotdb/commons/schema/SchemaConstant.java | 5 + .../iotdb/commons/schema/filter/SchemaFilter.java | 9 + .../commons/schema/filter/SchemaFilterType.java | 10 + .../commons/schema/filter/SchemaFilterVisitor.java | 15 + .../schema/filter/impl/DeviceAttributeFilter.java | 76 +++++ .../schema/filter/impl/DeviceFilterUtil.java | 68 +++++ .../commons/schema/filter/impl/DeviceIdFilter.java | 79 +++++ .../iotdb/commons/schema/filter/impl/OrFilter.java | 74 +++++ .../commons/schema/tree/AbstractTreeVisitor.java | 4 +- 31 files changed, 1757 insertions(+), 37 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java index 95e3524d321..18d94528bc2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.schemaengine.rescon.ISchemaRegionStatistics; import org.apache.iotdb.db.schemaengine.schemaregion.read.req.IShowDevicesPlan; import org.apache.iotdb.db.schemaengine.schemaregion.read.req.IShowNodesPlan; import org.apache.iotdb.db.schemaengine.schemaregion.read.req.IShowTimeSeriesPlan; +import org.apache.iotdb.db.schemaengine.schemaregion.read.req.impl.ShowTableDevicesPlan; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.IDeviceSchemaInfo; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.INodeSchemaInfo; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.ITimeSeriesSchemaInfo; @@ -297,6 +298,18 @@ public interface ISchemaRegion { // endregion + // region table device management + + void createTableDevice( + List<PartialPath> devicePathList, + List<String> attributeNameList, + List<List<String>> attributeValueList) + throws MetadataException; + + void deleteTableDevice(String table) throws MetadataException; + + // endregion + // region Interfaces for SchemaReader ISchemaReader<IDeviceSchemaInfo> getDeviceReader(IShowDevicesPlan showDevicesPlan) @@ -308,5 +321,12 @@ public interface ISchemaRegion { ISchemaReader<INodeSchemaInfo> getNodeReader(IShowNodesPlan showNodesPlan) throws MetadataException; + ISchemaReader<IDeviceSchemaInfo> getTableDeviceReader(ShowTableDevicesPlan showTableDevicesPlan) + throws MetadataException; + + ISchemaReader<IDeviceSchemaInfo> getTableDeviceReader(List<PartialPath> devicePathList) + throws MetadataException; + // endregion + // endregion } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/DeviceAttributeStore.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/DeviceAttributeStore.java new file mode 100644 index 00000000000..65ea0cb50a7 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/DeviceAttributeStore.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.schemaengine.schemaregion.attribute; + +import org.apache.iotdb.commons.file.SystemFileFactory; +import org.apache.iotdb.commons.schema.MemUsageUtil; +import org.apache.iotdb.commons.schema.SchemaConstant; +import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.db.schemaengine.rescon.MemSchemaRegionStatistics; + +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DeviceAttributeStore implements IDeviceAttributeStore { + + private static final Logger logger = LoggerFactory.getLogger(DeviceAttributeStore.class); + + public List<Map<String, String>> deviceAttributeList = new ArrayList<>(); + + private final MemSchemaRegionStatistics regionStatistics; + + public DeviceAttributeStore(MemSchemaRegionStatistics regionStatistics) { + this.regionStatistics = regionStatistics; + } + + @Override + public void clear() { + deviceAttributeList = new ArrayList<>(); + } + + @Override + public synchronized boolean createSnapshot(File targetDir) { + File snapshotTmp = + SystemFileFactory.INSTANCE.getFile(targetDir, SchemaConstant.DEVICE_ATTRIBUTE_SNAPSHOT_TMP); + File snapshot = + SystemFileFactory.INSTANCE.getFile(targetDir, SchemaConstant.DEVICE_ATTRIBUTE_SNAPSHOT); + + try { + FileOutputStream fileOutputStream = new FileOutputStream(snapshotTmp); + BufferedOutputStream outputStream = new BufferedOutputStream(fileOutputStream); + try { + serialize(outputStream); + } finally { + outputStream.flush(); + fileOutputStream.getFD().sync(); + outputStream.close(); + } + if (snapshot.exists() && !FileUtils.deleteFileIfExist(snapshot)) { + logger.error( + "Failed to delete old snapshot {} while creating device attribute snapshot.", + snapshot.getName()); + return false; + } + if (!snapshotTmp.renameTo(snapshot)) { + logger.error( + "Failed to rename {} to {} while creating device attribute snapshot.", + snapshotTmp.getName(), + snapshot.getName()); + FileUtils.deleteFileIfExist(snapshot); + return false; + } + + return true; + } catch (IOException e) { + logger.error("Failed to create mtree snapshot due to {}", e.getMessage(), e); + FileUtils.deleteFileIfExist(snapshot); + return false; + } finally { + FileUtils.deleteFileIfExist(snapshotTmp); + } + } + + @Override + public void loadFromSnapshot(File snapshotDir, String sgSchemaDirPath) throws IOException { + try (BufferedInputStream inputStream = + new BufferedInputStream( + Files.newInputStream( + SystemFileFactory.INSTANCE + .getFile(snapshotDir, SchemaConstant.DEVICE_ATTRIBUTE_SNAPSHOT) + .toPath()))) { + deserialize(inputStream); + } catch (IOException e) { + logger.warn("Load device attribute snapshot from {} failed", snapshotDir); + throw e; + } + } + + @Override + public synchronized int createAttribute(List<String> nameList, List<String> valueList) { + long memUsage = 0L; + Map<String, String> attributeMap = new HashMap<>(); + for (int i = 0; i < nameList.size(); i++) { + attributeMap.put(nameList.get(i), valueList.get(i)); + memUsage += MemUsageUtil.computeKVMemUsageInMap(nameList.get(i), valueList.get(i)); + } + deviceAttributeList.add(attributeMap); + requestMemory(memUsage); + return deviceAttributeList.size() - 1; + } + + @Override + public void alterAttribute(int pointer, List<String> nameList, List<String> valueList) { + long memUsageDelta = 0L; + long originMemUsage; + long updatedMemUsage; + Map<String, String> attributeMap = deviceAttributeList.get(pointer); + for (int i = 0; i < nameList.size(); i++) { + String key = nameList.get(i); + originMemUsage = + attributeMap.containsKey(key) + ? 0 + : MemUsageUtil.computeKVMemUsageInMap(key, attributeMap.get(key)); + + attributeMap.put(key, valueList.get(i)); + + updatedMemUsage = MemUsageUtil.computeKVMemUsageInMap(key, valueList.get(i)); + memUsageDelta += updatedMemUsage - originMemUsage; + } + requestMemory(memUsageDelta); + } + + @Override + public String getAttribute(int pointer, String name) { + return deviceAttributeList.get(pointer).get(name); + } + + private void serialize(OutputStream outputStream) throws IOException { + ReadWriteIOUtils.write(deviceAttributeList.size(), outputStream); + for (Map<String, String> attributeMap : deviceAttributeList) { + ReadWriteIOUtils.write(attributeMap, outputStream); + } + } + + private void deserialize(InputStream inputStream) throws IOException { + int size = ReadWriteIOUtils.readInt(inputStream); + for (int i = 0; i < size; i++) { + deviceAttributeList.add(ReadWriteIOUtils.readMap(inputStream)); + } + } + + private void requestMemory(long size) { + if (regionStatistics != null) { + regionStatistics.requestMemory(size); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/info/BasicMNodeInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/IDeviceAttributeStore.java similarity index 60% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/info/BasicMNodeInfo.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/IDeviceAttributeStore.java index 36f92bfda56..79f44d20bd2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/info/BasicMNodeInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/IDeviceAttributeStore.java @@ -17,26 +17,23 @@ * under the License. */ -package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.info; +package org.apache.iotdb.db.schemaengine.schemaregion.attribute; -public class BasicMNodeInfo { - private String name; +import java.io.File; +import java.io.IOException; +import java.util.List; - public BasicMNodeInfo(String name) { - this.name = name; - } +public interface IDeviceAttributeStore { - public String getName() { - return name; - } + void clear(); - public void setName(String name) { - this.name = name; - } + boolean createSnapshot(File targetDir); - public int estimateSize() { - // object header, 8B - // name reference, name length and name hash code, 8 + 4 + 4 = 16B - return 8 + 16 + 2 * name.length(); - } + void loadFromSnapshot(File snapshotDir, String sgSchemaDirPath) throws IOException; + + int createAttribute(List<String> nameList, List<String> valueList); + + void alterAttribute(int pointer, List<String> nameList, List<String> valueList); + + String getAttribute(int pointer, String name); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java index d5b7f58cfd4..7775bc315a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java @@ -49,6 +49,8 @@ import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegionPlan; import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegion; import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionPlanVisitor; import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionUtils; +import org.apache.iotdb.db.schemaengine.schemaregion.attribute.DeviceAttributeStore; +import org.apache.iotdb.db.schemaengine.schemaregion.attribute.IDeviceAttributeStore; import org.apache.iotdb.db.schemaengine.schemaregion.logfile.FakeCRC32Deserializer; import org.apache.iotdb.db.schemaengine.schemaregion.logfile.FakeCRC32Serializer; import org.apache.iotdb.db.schemaengine.schemaregion.logfile.SchemaLogReader; @@ -60,6 +62,7 @@ import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.IMemMN import org.apache.iotdb.db.schemaengine.schemaregion.read.req.IShowDevicesPlan; import org.apache.iotdb.db.schemaengine.schemaregion.read.req.IShowNodesPlan; import org.apache.iotdb.db.schemaengine.schemaregion.read.req.IShowTimeSeriesPlan; +import org.apache.iotdb.db.schemaengine.schemaregion.read.req.impl.ShowTableDevicesPlan; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.IDeviceSchemaInfo; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.INodeSchemaInfo; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.ITimeSeriesSchemaInfo; @@ -159,6 +162,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { private MTreeBelowSGMemoryImpl mtree; private TagManager tagManager; + private IDeviceAttributeStore deviceAttributeStore; // region Interfaces and Implementation of initialization、snapshot、recover and clear public SchemaRegionMemoryImpl(ISchemaRegionParams schemaRegionParams) throws MetadataException { @@ -209,6 +213,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { // do not write log when recover isRecovering = true; + deviceAttributeStore = new DeviceAttributeStore(regionStatistics); tagManager = new TagManager(schemaRegionDirPath, regionStatistics); mtree = new MTreeBelowSGMemoryImpl( @@ -449,6 +454,13 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { schemaRegionId, System.currentTimeMillis() - tagSnapshotStartTime); + long deviceAttributeSnapshotStartTime = System.currentTimeMillis(); + isSuccess = isSuccess && deviceAttributeStore.createSnapshot(snapshotDir); + logger.info( + "Device attribute snapshot creation of schemaRegion {} costs {}ms", + schemaRegionId, + System.currentTimeMillis() - deviceAttributeSnapshotStartTime); + logger.info( "Snapshot creation of schemaRegion {} costs {}ms.", schemaRegionId, @@ -471,6 +483,14 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { isRecovering = true; + long deviceAttributeSnapshotStartTime = System.currentTimeMillis(); + deviceAttributeStore = new DeviceAttributeStore(regionStatistics); + deviceAttributeStore.loadFromSnapshot(latestSnapshotRootDir, schemaRegionDirPath); + logger.info( + "Device attribute snapshot loading of schemaRegion {} costs {}ms.", + schemaRegionId, + System.currentTimeMillis() - deviceAttributeSnapshotStartTime); + long tagSnapshotStartTime = System.currentTimeMillis(); tagManager = TagManager.loadFromSnapshot(latestSnapshotRootDir, schemaRegionDirPath, regionStatistics); @@ -1253,10 +1273,35 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { return result; } + @Override + public void createTableDevice( + List<PartialPath> devicePathList, + List<String> attributeNameList, + List<List<String>> attributeValueList) + throws MetadataException { + for (int i = 0; i < devicePathList.size(); i++) { + int finalI = i; + mtree.createTableDevice( + devicePathList.get(i), + () -> + deviceAttributeStore.createAttribute( + attributeNameList, attributeValueList.get(finalI)), + pointer -> + deviceAttributeStore.alterAttribute( + pointer, attributeNameList, attributeValueList.get(finalI))); + } + } + + @Override + public void deleteTableDevice(String table) throws MetadataException { + mtree.deleteTableDevice(table); + } + @Override public ISchemaReader<IDeviceSchemaInfo> getDeviceReader(IShowDevicesPlan showDevicesPlan) throws MetadataException { - return mtree.getDeviceReader(showDevicesPlan); + return mtree.getDeviceReader( + showDevicesPlan, (pointer, name) -> deviceAttributeStore.getAttribute(pointer, name)); } @Override @@ -1286,6 +1331,22 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { return mtree.getNodeReader(showNodesPlan); } + @Override + public ISchemaReader<IDeviceSchemaInfo> getTableDeviceReader( + ShowTableDevicesPlan showTableDevicesPlan) throws MetadataException { + return mtree.getTableDeviceReader( + showTableDevicesPlan.getDevicePattern(), + showTableDevicesPlan.getAttributeFilter(), + (pointer, name) -> deviceAttributeStore.getAttribute(pointer, name)); + } + + @Override + public ISchemaReader<IDeviceSchemaInfo> getTableDeviceReader(List<PartialPath> devicePathList) + throws MetadataException { + return mtree.getTableDeviceReader( + devicePathList, (pointer, name) -> deviceAttributeStore.getAttribute(pointer, name)); + } + // endregion private static class RecoverOperationResult { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java index ad3ee7a223b..6ae52183a90 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java @@ -63,6 +63,7 @@ import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICa import org.apache.iotdb.db.schemaengine.schemaregion.read.req.IShowDevicesPlan; import org.apache.iotdb.db.schemaengine.schemaregion.read.req.IShowNodesPlan; import org.apache.iotdb.db.schemaengine.schemaregion.read.req.IShowTimeSeriesPlan; +import org.apache.iotdb.db.schemaengine.schemaregion.read.req.impl.ShowTableDevicesPlan; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.IDeviceSchemaInfo; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.INodeSchemaInfo; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.ITimeSeriesSchemaInfo; @@ -1358,6 +1359,20 @@ public class SchemaRegionPBTreeImpl implements ISchemaRegion { return result; } + @Override + public void createTableDevice( + List<PartialPath> devicePathList, + List<String> attributeNameList, + List<List<String>> attributeValueList) + throws MetadataException { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteTableDevice(String table) throws MetadataException { + throw new UnsupportedOperationException(); + } + @Override public ISchemaReader<IDeviceSchemaInfo> getDeviceReader(IShowDevicesPlan showDevicesPlan) throws MetadataException { @@ -1391,6 +1406,18 @@ public class SchemaRegionPBTreeImpl implements ISchemaRegion { return mtree.getNodeReader(showNodesPlan); } + @Override + public ISchemaReader<IDeviceSchemaInfo> getTableDeviceReader( + ShowTableDevicesPlan showTableDevicesPlan) throws MetadataException { + throw new UnsupportedOperationException(); + } + + @Override + public ISchemaReader<IDeviceSchemaInfo> getTableDeviceReader(List<PartialPath> devicePathList) + throws MetadataException { + throw new UnsupportedOperationException(); + } + // endregion private static class RecoverOperationResult { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java index e76c3931d6b..66e58b18563 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.schema.SchemaConstant; +import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.commons.schema.node.role.IDeviceMNode; import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode; import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory; @@ -46,6 +47,7 @@ import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree; import org.apache.iotdb.db.schemaengine.metric.SchemaRegionMemMetric; import org.apache.iotdb.db.schemaengine.rescon.MemSchemaRegionStatistics; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.IMemMNode; +import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.info.TableDeviceInfo; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.traverser.collector.EntityCollector; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.traverser.collector.MNodeCollector; @@ -85,13 +87,17 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.IntConsumer; +import java.util.function.IntSupplier; /** * The hierarchical struct of the Metadata Tree is implemented in this class. @@ -948,7 +954,8 @@ public class MTreeBelowSGMemoryImpl { // region Interfaces for schema reader @SuppressWarnings("java:S2095") - public ISchemaReader<IDeviceSchemaInfo> getDeviceReader(IShowDevicesPlan showDevicesPlan) + public ISchemaReader<IDeviceSchemaInfo> getDeviceReader( + IShowDevicesPlan showDevicesPlan, BiFunction<Integer, String, String> attributeProvider) throws MetadataException { EntityCollector<IDeviceSchemaInfo, IMemMNode> collector = new EntityCollector<IDeviceSchemaInfo, IMemMNode>( @@ -960,8 +967,15 @@ public class MTreeBelowSGMemoryImpl { protected IDeviceSchemaInfo collectEntity(IDeviceMNode<IMemMNode> node) { PartialPath device = getPartialPathFromRootToNode(node.getAsMNode()); - return new ShowDevicesResult( - device.getFullPath(), node.isAlignedNullable(), node.getSchemaTemplateId()); + ShowDevicesResult result = + new ShowDevicesResult( + device.getFullPath(), node.isAlignedNullable(), node.getSchemaTemplateId()); + result.setAttributeProvider( + k -> + attributeProvider.apply( + ((TableDeviceInfo<IMemMNode>) node.getDeviceInfo()).getAttributePointer(), + k)); + return result; } }; if (showDevicesPlan.usingSchemaTemplate()) { @@ -992,7 +1006,8 @@ public class MTreeBelowSGMemoryImpl { public boolean hasNext() { while (next == null && collector.hasNext()) { IDeviceSchemaInfo temp = collector.next(); - if (filterVisitor.process(showDevicesPlan.getSchemaFilter(), temp)) { + if (showDevicesPlan.getSchemaFilter() == null + || filterVisitor.process(showDevicesPlan.getSchemaFilter(), temp)) { next = temp; } } @@ -1016,6 +1031,153 @@ public class MTreeBelowSGMemoryImpl { } } + // used for device query/fetch with filters during show device or table query + public ISchemaReader<IDeviceSchemaInfo> getTableDeviceReader( + PartialPath pattern, + SchemaFilter attributeFilter, + BiFunction<Integer, String, String> attributeProvider) + throws MetadataException { + EntityCollector<IDeviceSchemaInfo, IMemMNode> collector = + new EntityCollector<IDeviceSchemaInfo, IMemMNode>(rootNode, pattern, store, false, null) { + + protected IDeviceSchemaInfo collectEntity(IDeviceMNode<IMemMNode> node) { + PartialPath device = getPartialPathFromRootToNode(node.getAsMNode()); + ShowDevicesResult result = + new ShowDevicesResult( + device.getFullPath(), + node.isAlignedNullable(), + node.getSchemaTemplateId(), + node.getPartialPath().getNodes()); + result.setAttributeProvider( + k -> + attributeProvider.apply( + ((TableDeviceInfo<IMemMNode>) node.getDeviceInfo()).getAttributePointer(), + k)); + return result; + } + }; + return new ISchemaReader<IDeviceSchemaInfo>() { + + private final DeviceFilterVisitor filterVisitor = new DeviceFilterVisitor(); + private IDeviceSchemaInfo next; + + public boolean isSuccess() { + return collector.isSuccess(); + } + + public Throwable getFailure() { + return collector.getFailure(); + } + + public void close() { + collector.close(); + } + + public ListenableFuture<?> isBlocked() { + return NOT_BLOCKED; + } + + public boolean hasNext() { + while (next == null && collector.hasNext()) { + IDeviceSchemaInfo temp = collector.next(); + if (attributeFilter == null || filterVisitor.process(attributeFilter, temp)) { + next = temp; + } + } + return next != null; + } + + public IDeviceSchemaInfo next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + IDeviceSchemaInfo result = next; + next = null; + return result; + } + }; + } + + // used for device fetch with explicit device id/path during table insertion + public ISchemaReader<IDeviceSchemaInfo> getTableDeviceReader( + List<PartialPath> devicePathList, BiFunction<Integer, String, String> attributeProvider) { + return new ISchemaReader<IDeviceSchemaInfo>() { + + Iterator<PartialPath> devicePathIterator = devicePathList.listIterator(); + + IDeviceSchemaInfo next = null; + + Throwable t = null; + + @Override + public boolean isSuccess() { + return t == null; + } + + @Override + public Throwable getFailure() { + return t; + } + + @Override + public ListenableFuture<?> isBlocked() { + return NOT_BLOCKED; + } + + @Override + public boolean hasNext() { + if (next == null) { + tryGetNext(); + } + return next != null; + } + + @Override + public IDeviceSchemaInfo next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + IDeviceSchemaInfo result = next; + next = null; + return result; + } + + private void tryGetNext() { + while (devicePathIterator.hasNext()) { + try { + IMemMNode node = getNodeByPath(devicePathIterator.next()); + if (!node.isDevice()) { + continue; + } + IDeviceMNode<IMemMNode> deviceNode = node.getAsDeviceMNode(); + ShowDevicesResult result = + new ShowDevicesResult( + deviceNode.getFullPath(), + deviceNode.isAlignedNullable(), + deviceNode.getSchemaTemplateId(), + deviceNode.getPartialPath().getNodes()); + result.setAttributeProvider( + k -> + attributeProvider.apply( + ((TableDeviceInfo<IMemMNode>) deviceNode.getDeviceInfo()) + .getAttributePointer(), + k)); + next = result; + break; + } catch (PathNotExistException e) { + continue; + } catch (Throwable e) { + t = e; + return; + } + } + } + + @Override + public void close() throws Exception {} + }; + } + public ISchemaReader<ITimeSeriesSchemaInfo> getTimeSeriesReader( IShowTimeSeriesPlan showTimeSeriesPlan, Function<Long, Pair<Map<String, String>, Map<String, String>>> tagAndAttributeProvider) @@ -1252,5 +1414,50 @@ public class MTreeBelowSGMemoryImpl { } return result; } + + // endregion + + // region table device management + + public void createTableDevice( + PartialPath devicePath, IntSupplier attributePointerGetter, IntConsumer attributeUppdater) + throws MetadataException { + String[] nodeNames = devicePath.getNodes(); + IMemMNode cur = storageGroupMNode; + IMemMNode child; + for (int i = levelOfSG + 1; i < nodeNames.length; i++) { + child = cur.getChild(nodeNames[i]); + if (child == null) { + child = + store.addChild(cur, nodeNames[i], nodeFactory.createInternalMNode(cur, nodeNames[i])); + } + cur = child; + } + + IDeviceMNode<IMemMNode> entityMNode; + + synchronized (this) { + if (cur.isDevice()) { + entityMNode = cur.getAsDeviceMNode(); + if (!(entityMNode.getDeviceInfo() instanceof TableDeviceInfo)) { + throw new MetadataException("Table device shall not create under tree model"); + } + TableDeviceInfo<IMemMNode> deviceInfo = + (TableDeviceInfo<IMemMNode>) entityMNode.getDeviceInfo(); + attributeUppdater.accept(deviceInfo.getAttributePointer()); + } else { + entityMNode = store.setToEntity(cur); + TableDeviceInfo<IMemMNode> deviceInfo = new TableDeviceInfo<>(); + deviceInfo.setAttributePointer(attributePointerGetter.getAsInt()); + entityMNode.getAsInternalMNode().setDeviceInfo(deviceInfo); + regionStatistics.addDevice(); + } + } + } + + public void deleteTableDevice(String tableName) { + storageGroupMNode.deleteChild(tableName); + } + // endregion } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/basic/BasicMNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/basic/BasicMNode.java index ccacef6c7eb..ea1ff409ecb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/basic/BasicMNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/basic/BasicMNode.java @@ -84,7 +84,8 @@ public class BasicMNode implements IMemMNode { } String concatFullPath() { - StringBuilder builder = new StringBuilder(getName()); + StringBuilder builder = new StringBuilder(); + builder.insert(0, getName()); IMemMNode curr = this; while (curr.getParent() != null) { curr = curr.getParent(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/container/KeyNullableConcurrentHashMap.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/container/KeyNullableConcurrentHashMap.java new file mode 100644 index 00000000000..d5bfaa4717a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/container/KeyNullableConcurrentHashMap.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.container; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; + +// The value in this map shall not be null. +// Therefore, when using compute method, use v==null to judge if there's existing value. +public class KeyNullableConcurrentHashMap<K, V> implements Map<K, V> { + + private final Map<Optional<K>, V> map = new ConcurrentHashMap<>(); + + @Override + public int size() { + return map.size(); + } + + @Override + public boolean isEmpty() { + return map.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return map.containsKey(Optional.ofNullable(key)); + } + + @Override + public boolean containsValue(Object value) { + return map.containsValue(value); + } + + @Override + public V get(Object key) { + return map.get(Optional.ofNullable(key)); + } + + @Override + public V put(K key, V value) { + return map.put(Optional.ofNullable(key), value); + } + + @Override + public V remove(Object key) { + return map.remove(Optional.ofNullable(key)); + } + + @Override + public void putAll(Map<? extends K, ? extends V> m) { + m.forEach((k, v) -> map.put(Optional.ofNullable(k), v)); + } + + @Override + public void clear() { + map.clear(); + } + + @Override + public Set<K> keySet() { + return map.keySet().stream().map(k -> k.orElse(null)).collect(Collectors.toSet()); + } + + @Override + public Collection<V> values() { + return map.values(); + } + + @Override + public Set<Entry<K, V>> entrySet() { + return map.entrySet().stream() + .map( + o -> + new Entry<K, V>() { + @Override + public K getKey() { + return o.getKey().orElse(null); + } + + @Override + public V getValue() { + return o.getValue(); + } + + @Override + public V setValue(V value) { + return o.setValue(value); + } + }) + .collect(Collectors.toSet()); + } + + @Override + public V getOrDefault(Object key, V defaultValue) { + return map.getOrDefault(Optional.ofNullable(key), defaultValue); + } + + @Override + public void forEach(BiConsumer<? super K, ? super V> action) { + map.forEach((k, v) -> action.accept(k.orElse(null), v)); + } + + @Override + public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) { + map.replaceAll((k, v) -> function.apply(k.orElse(null), v)); + } + + @Override + public V putIfAbsent(K key, V value) { + return map.putIfAbsent(Optional.ofNullable(key), value); + } + + @Override + public boolean remove(Object key, Object value) { + return map.remove(Optional.ofNullable(key), value); + } + + @Override + public boolean replace(K key, V oldValue, V newValue) { + return map.replace(Optional.ofNullable(key), oldValue, newValue); + } + + @Override + public V replace(K key, V value) { + return map.replace(Optional.ofNullable(key), value); + } + + @Override + public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { + return map.computeIfAbsent( + Optional.ofNullable(key), k -> mappingFunction.apply(k.orElse(null))); + } + + @Override + public V computeIfPresent( + K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { + return map.computeIfPresent( + Optional.ofNullable(key), (k, v) -> remappingFunction.apply(k.orElse(null), v)); + } + + @Override + public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { + return map.compute( + Optional.ofNullable(key), (k, v) -> remappingFunction.apply(k.orElse(null), v)); + } + + @Override + public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { + return map.merge(Optional.ofNullable(key), value, remappingFunction); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/container/MemMNodeContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/container/MemMNodeContainer.java index 44c16126ea8..97de504f4b5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/container/MemMNodeContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/container/MemMNodeContainer.java @@ -28,11 +28,10 @@ import java.util.AbstractMap; import java.util.Collection; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import static java.util.Collections.emptySet; -public class MemMNodeContainer extends ConcurrentHashMap<String, IMemMNode> +public class MemMNodeContainer extends KeyNullableConcurrentHashMap<String, IMemMNode> implements IMNodeContainer<IMemMNode> { private static final IMNodeContainer<IMemMNode> EMPTY_CONTAINER = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/info/BasicMNodeInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/info/BasicMNodeInfo.java index 36f92bfda56..97d13ee6757 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/info/BasicMNodeInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/info/BasicMNodeInfo.java @@ -37,6 +37,6 @@ public class BasicMNodeInfo { public int estimateSize() { // object header, 8B // name reference, name length and name hash code, 8 + 4 + 4 = 16B - return 8 + 16 + 2 * name.length(); + return 8 + 16 + 2 * (name == null ? 0 : name.length()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/info/TableDeviceInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/info/TableDeviceInfo.java new file mode 100644 index 00000000000..857cf20a40d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/mnode/info/TableDeviceInfo.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.info; + +import org.apache.iotdb.commons.schema.node.IMNode; +import org.apache.iotdb.commons.schema.node.info.IDeviceInfo; +import org.apache.iotdb.commons.schema.node.role.IDeviceMNode; +import org.apache.iotdb.commons.schema.node.role.IMeasurementMNode; + +import java.util.Map; + +import static org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE; + +public class TableDeviceInfo<N extends IMNode<N>> implements IDeviceInfo<N> { + + private int attributePointer = -1; + + public int getAttributePointer() { + return attributePointer; + } + + public void setAttributePointer(int attributePointer) { + this.attributePointer = attributePointer; + } + + @Override + public void moveDataToNewMNode(IDeviceMNode<N> newMNode) {} + + @Override + public boolean addAlias(String alias, IMeasurementMNode<N> child) { + return false; + } + + @Override + public void deleteAliasChild(String alias) {} + + @Override + public Map<String, IMeasurementMNode<N>> getAliasChildren() { + return null; + } + + @Override + public void setAliasChildren(Map<String, IMeasurementMNode<N>> aliasChildren) {} + + @Override + public boolean hasAliasChild(String name) { + return false; + } + + @Override + public N getAliasChild(String name) { + return null; + } + + @Override + public boolean isUseTemplate() { + return false; + } + + @Override + public void setUseTemplate(boolean useTemplate) {} + + @Override + public void setSchemaTemplateId(int schemaTemplateId) {} + + @Override + public int getSchemaTemplateId() { + return NON_TEMPLATE; + } + + @Override + public int getSchemaTemplateIdWithState() { + return 0; + } + + @Override + public boolean isPreDeactivateTemplate() { + return false; + } + + @Override + public void preDeactivateTemplate() {} + + @Override + public void rollbackPreDeactivateTemplate() {} + + @Override + public void deactivateTemplate() {} + + @Override + public Boolean isAligned() { + return true; + } + + @Override + public void setAligned(Boolean isAligned) {} + + @Override + public int estimateSize() { + return 12; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/snapshot/MemMTreeSnapshotUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/snapshot/MemMTreeSnapshotUtil.java index 5b8dd82125f..360e4f2fde6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/snapshot/MemMTreeSnapshotUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/snapshot/MemMTreeSnapshotUtil.java @@ -34,6 +34,7 @@ import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.schemaengine.rescon.MemSchemaRegionStatistics; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.MemMTreeStore; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.IMemMNode; +import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.mem.mnode.info.TableDeviceInfo; import org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -59,6 +60,7 @@ import static org.apache.iotdb.commons.schema.SchemaConstant.LOGICAL_VIEW_MNODE_ import static org.apache.iotdb.commons.schema.SchemaConstant.MEASUREMENT_MNODE_TYPE; import static org.apache.iotdb.commons.schema.SchemaConstant.STORAGE_GROUP_ENTITY_MNODE_TYPE; import static org.apache.iotdb.commons.schema.SchemaConstant.STORAGE_GROUP_MNODE_TYPE; +import static org.apache.iotdb.commons.schema.SchemaConstant.TABLE_MNODE_TYPE; import static org.apache.iotdb.commons.schema.SchemaConstant.isStorageGroupType; public class MemMTreeSnapshotUtil { @@ -250,6 +252,11 @@ public class MemMTreeSnapshotUtil { node = deserializer.deserializeLogicalViewMNode(inputStream); measurementProcess.accept(node.getAsMeasurementMNode()); break; + case TABLE_MNODE_TYPE: + childrenNum = ReadWriteIOUtils.readInt(inputStream); + node = deserializer.deserializeTableDeviceMNode(inputStream); + deviceProcess.accept(node.getAsDeviceMNode()); + break; default: throw new IOException("Unrecognized MNode type " + type); } @@ -280,12 +287,20 @@ public class MemMTreeSnapshotUtil { public Boolean visitBasicMNode(IMNode<?> node, OutputStream outputStream) { try { if (node.isDevice()) { - ReadWriteIOUtils.write(ENTITY_MNODE_TYPE, outputStream); - serializeBasicMNode(node, outputStream); - IDeviceMNode<?> deviceMNode = node.getAsDeviceMNode(); - ReadWriteIOUtils.write(deviceMNode.getSchemaTemplateIdWithState(), outputStream); - ReadWriteIOUtils.write(deviceMNode.isUseTemplate(), outputStream); - ReadWriteIOUtils.write(deviceMNode.isAlignedNullable(), outputStream); + if (node.getAsDeviceMNode().getDeviceInfo() instanceof TableDeviceInfo) { + ReadWriteIOUtils.write(TABLE_MNODE_TYPE, outputStream); + TableDeviceInfo<IMemMNode> tableDeviceInfo = + (TableDeviceInfo<IMemMNode>) (node.getAsDeviceMNode().getDeviceInfo()); + serializeBasicMNode(node, outputStream); + ReadWriteIOUtils.write(tableDeviceInfo.getAttributePointer(), outputStream); + } else { + ReadWriteIOUtils.write(ENTITY_MNODE_TYPE, outputStream); + serializeBasicMNode(node, outputStream); + IDeviceMNode<?> deviceMNode = node.getAsDeviceMNode(); + ReadWriteIOUtils.write(deviceMNode.getSchemaTemplateIdWithState(), outputStream); + ReadWriteIOUtils.write(deviceMNode.isUseTemplate(), outputStream); + ReadWriteIOUtils.write(deviceMNode.isAlignedNullable(), outputStream); + } } else { ReadWriteIOUtils.write(INTERNAL_MNODE_TYPE, outputStream); serializeBasicMNode(node, outputStream); @@ -394,6 +409,15 @@ public class MemMTreeSnapshotUtil { return node.getAsMNode(); } + public IMemMNode deserializeTableDeviceMNode(InputStream inputStream) throws IOException { + String name = ReadWriteIOUtils.readString(inputStream); + IDeviceMNode<IMemMNode> node = nodeFactory.createDeviceMNode(null, name); + TableDeviceInfo<IMemMNode> tableDeviceInfo = new TableDeviceInfo<>(); + tableDeviceInfo.setAttributePointer(ReadWriteIOUtils.readInt(inputStream)); + node.getAsInternalMNode().setDeviceInfo(tableDeviceInfo); + return node.getAsMNode(); + } + public IMemMNode deserializeMeasurementMNode(InputStream inputStream) throws IOException { String name = ReadWriteIOUtils.readString(inputStream); MeasurementSchema schema = MeasurementSchema.deserializeFrom(inputStream); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/read/req/impl/ShowTableDevicesPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/read/req/impl/ShowTableDevicesPlan.java new file mode 100644 index 00000000000..b328f28c2c2 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/read/req/impl/ShowTableDevicesPlan.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.iotdb.db.schemaengine.schemaregion.read.req.impl; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.schema.filter.SchemaFilter; +import org.apache.iotdb.commons.schema.filter.impl.AndFilter; + +import java.util.List; + +public class ShowTableDevicesPlan { + + private PartialPath devicePattern; + + private SchemaFilter attributeFilter; + + public ShowTableDevicesPlan(PartialPath devicePattern, SchemaFilter attributeFilter) { + this.devicePattern = devicePattern; + this.attributeFilter = attributeFilter; + } + + private SchemaFilter getAttributeFilter(List<SchemaFilter> filterList) { + if (filterList.isEmpty()) { + return null; + } + AndFilter andFilter; + SchemaFilter latestFilter = filterList.get(0); + for (int i = 1; i < filterList.size(); i++) { + andFilter = new AndFilter(latestFilter, filterList.get(i)); + latestFilter = andFilter; + } + return latestFilter; + } + + public PartialPath getDevicePattern() { + return devicePattern; + } + + public SchemaFilter getAttributeFilter() { + return attributeFilter; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/read/resp/info/IDeviceSchemaInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/read/resp/info/IDeviceSchemaInfo.java index 9a670f9b44b..a144e10c9c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/read/resp/info/IDeviceSchemaInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/read/resp/info/IDeviceSchemaInfo.java @@ -24,4 +24,8 @@ public interface IDeviceSchemaInfo extends ISchemaInfo { Boolean isAligned(); int getTemplateId(); + + String getAttributeValue(String attributeKey); + + String[] getRawNodes(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/read/resp/info/impl/ShowDevicesResult.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/read/resp/info/impl/ShowDevicesResult.java index 19aa1971008..b71dab7121b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/read/resp/info/impl/ShowDevicesResult.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/read/resp/info/impl/ShowDevicesResult.java @@ -18,20 +18,33 @@ */ package org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.impl; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.IDeviceSchemaInfo; import java.util.Objects; +import java.util.function.Function; public class ShowDevicesResult extends ShowSchemaResult implements IDeviceSchemaInfo { private Boolean isAligned; private int templateId; + private Function<String, String> attributeProvider; + + private String[] rawNodes = null; + public ShowDevicesResult(String name, Boolean isAligned, int templateId) { super(name); this.isAligned = isAligned; this.templateId = templateId; } + public ShowDevicesResult(String name, Boolean isAligned, int templateId, String[] rawNodes) { + super(name); + this.isAligned = isAligned; + this.templateId = templateId; + this.rawNodes = rawNodes; + } + public Boolean isAligned() { return isAligned; } @@ -40,6 +53,24 @@ public class ShowDevicesResult extends ShowSchemaResult implements IDeviceSchema return templateId; } + public void setAttributeProvider(Function<String, String> attributeProvider) { + this.attributeProvider = attributeProvider; + } + + @Override + public String getAttributeValue(String attributeKey) { + return attributeProvider.apply(attributeKey); + } + + public String[] getRawNodes() { + return rawNodes; + } + + @Override + public PartialPath getPartialPath() { + return rawNodes == null ? super.getPartialPath() : new PartialPath(rawNodes); + } + @Override public String toString() { return "ShowDevicesResult{" diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/filter/DeviceFilterVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/filter/DeviceFilterVisitor.java index e1723193abf..cf1ab0a1288 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/filter/DeviceFilterVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/filter/DeviceFilterVisitor.java @@ -21,11 +21,15 @@ package org.apache.iotdb.db.schemaengine.schemaregion.utils.filter; import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.commons.schema.filter.SchemaFilterVisitor; +import org.apache.iotdb.commons.schema.filter.impl.DeviceAttributeFilter; +import org.apache.iotdb.commons.schema.filter.impl.DeviceIdFilter; import org.apache.iotdb.commons.schema.filter.impl.PathContainsFilter; import org.apache.iotdb.commons.schema.filter.impl.TemplateFilter; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.IDeviceSchemaInfo; import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager; +import java.util.Objects; + public class DeviceFilterVisitor extends SchemaFilterVisitor<IDeviceSchemaInfo> { @Override public boolean visitNode(SchemaFilter filter, IDeviceSchemaInfo info) { @@ -59,4 +63,19 @@ public class DeviceFilterVisitor extends SchemaFilterVisitor<IDeviceSchemaInfo> return false; } } + + @Override + public boolean visitDeviceIdFilter(DeviceIdFilter filter, IDeviceSchemaInfo info) { + String[] nodes = info.getPartialPath().getNodes(); + if (nodes.length < filter.getIndex() + 3) { + return false; + } else { + return Objects.equals(nodes[filter.getIndex() + 3], filter.getValue()); + } + } + + @Override + public boolean visitDeviceAttributeFilter(DeviceAttributeFilter filter, IDeviceSchemaInfo info) { + return Objects.equals(filter.getValue(), info.getAttributeValue(filter.getKey())); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTableDeviceTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTableDeviceTest.java new file mode 100644 index 00000000000..7b7b2fc2db5 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTableDeviceTest.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.metadata.schemaRegion; + +import org.apache.iotdb.commons.schema.filter.impl.DeviceAttributeFilter; +import org.apache.iotdb.commons.schema.filter.impl.DeviceIdFilter; +import org.apache.iotdb.commons.schema.filter.impl.OrFilter; +import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion; +import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.IDeviceSchemaInfo; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class SchemaRegionTableDeviceTest extends AbstractSchemaRegionTest { + + public SchemaRegionTableDeviceTest(SchemaRegionTestParams testParams) { + super(testParams); + } + + @Test + public void testDeviceCreation() throws Exception { + if (!testParams.getTestModeName().equals("MemoryMode")) { + return; + } + ISchemaRegion schemaRegion = getSchemaRegion("root.db", 0); + String tableName = "t"; + List<String[]> deviceIdList = + Arrays.asList( + new String[] {"hebei", "p_1", "d_0"}, + new String[] {"hebei", "p_1", "d_1"}, + new String[] {"shandong", "p_1", "d_1"}); + + for (String[] deviceId : deviceIdList) { + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName, deviceId, Collections.emptyMap()); + } + List<IDeviceSchemaInfo> deviceSchemaInfoList = + SchemaRegionTestUtil.getTableDevice(schemaRegion, tableName, deviceIdList); + Assert.assertEquals(3, deviceSchemaInfoList.size()); + List<String[]> result = + deviceSchemaInfoList.stream() + .map(IDeviceSchemaInfo::getRawNodes) + .collect(Collectors.toList()); + Assert.assertEquals( + deviceIdList.stream().map(Arrays::toString).sorted().collect(Collectors.toList()), + result.stream() + .map(o -> Arrays.copyOfRange(o, 3, o.length)) + .map(Arrays::toString) + .sorted() + .collect(Collectors.toList())); + + Map<String, String> attributeMap = new HashMap<>(); + attributeMap.put("type", "new"); + attributeMap.put("cycle", "monthly"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName, new String[] {"hebei", "p_1", "d_0"}, attributeMap); + attributeMap.put("type", "old"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName, new String[] {"hebei", "p_1", "d_1"}, attributeMap); + attributeMap.put("cycle", "daily"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName, new String[] {"shandong", "p_1", "d_1"}, attributeMap); + + deviceSchemaInfoList = + SchemaRegionTestUtil.getTableDevice( + schemaRegion, + tableName, + Collections.singletonList(new String[] {"hebei", "p_1", "d_0"})); + Assert.assertEquals("new", deviceSchemaInfoList.get(0).getAttributeValue("type")); + Assert.assertEquals("monthly", deviceSchemaInfoList.get(0).getAttributeValue("cycle")); + + deviceSchemaInfoList = + SchemaRegionTestUtil.getTableDevice( + schemaRegion, + tableName, + Collections.singletonList(new String[] {"hebei", "p_1", "d_1"})); + Assert.assertEquals("old", deviceSchemaInfoList.get(0).getAttributeValue("type")); + Assert.assertEquals("monthly", deviceSchemaInfoList.get(0).getAttributeValue("cycle")); + + deviceSchemaInfoList = + SchemaRegionTestUtil.getTableDevice( + schemaRegion, + tableName, + Collections.singletonList(new String[] {"shandong", "p_1", "d_1"})); + Assert.assertEquals("old", deviceSchemaInfoList.get(0).getAttributeValue("type")); + Assert.assertEquals("daily", deviceSchemaInfoList.get(0).getAttributeValue("cycle")); + } + + @Test + public void testDeviceQuery() throws Exception { + if (!testParams.getTestModeName().equals("MemoryMode")) { + return; + } + ISchemaRegion schemaRegion = getSchemaRegion("root.db", 0); + String tableName = "t"; + + Map<String, String> attributeMap = new HashMap<>(); + attributeMap.put("type", "new"); + attributeMap.put("cycle", "monthly"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName, new String[] {"hebei", "p_1", "d_0"}, attributeMap); + attributeMap.put("type", "old"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName, new String[] {"hebei", "p_1", "d_1"}, attributeMap); + attributeMap.put("cycle", "daily"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName, new String[] {"shandong", "p_1", "d_1"}, attributeMap); + + List<IDeviceSchemaInfo> deviceSchemaInfoList = + SchemaRegionTestUtil.getTableDevice( + schemaRegion, + tableName, + 3, + Arrays.asList(new DeviceIdFilter(0, "hebei"), new DeviceIdFilter(1, "p_1")), + null); + Assert.assertEquals(2, deviceSchemaInfoList.size()); + + deviceSchemaInfoList = + SchemaRegionTestUtil.getTableDevice( + schemaRegion, + tableName, + 3, + Collections.singletonList(new DeviceIdFilter(1, "p_1")), + null); + Assert.assertEquals(3, deviceSchemaInfoList.size()); + + deviceSchemaInfoList = + SchemaRegionTestUtil.getTableDevice( + schemaRegion, + tableName, + 3, + Collections.singletonList(new DeviceIdFilter(1, "p_1")), + new DeviceAttributeFilter("cycle", "daily")); + Assert.assertEquals(1, deviceSchemaInfoList.size()); + + deviceSchemaInfoList = + SchemaRegionTestUtil.getTableDevice( + schemaRegion, + tableName, + 3, + Collections.emptyList(), + new OrFilter( + new DeviceIdFilter(1, "p_1"), new DeviceAttributeFilter("cycle", "daily"))); + Assert.assertEquals(3, deviceSchemaInfoList.size()); + } + + @Test + public void testDeviceIdWithNull() throws Exception { + if (!testParams.getTestModeName().equals("MemoryMode")) { + return; + } + ISchemaRegion schemaRegion = getSchemaRegion("root.db", 0); + String tableName = "t"; + + Map<String, String> attributeMap = new HashMap<>(); + attributeMap.put("type", "new"); + attributeMap.put("cycle", null); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName, new String[] {"hebei", null, "d_0"}, attributeMap); + attributeMap.put("type", "old"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName, new String[] {"hebei", "p_1", "d_1"}, attributeMap); + attributeMap.put("cycle", "daily"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName, new String[] {"shandong", "p_1", null}, attributeMap); + + List<String[]> deviceIdList = + Arrays.asList( + new String[] {"hebei", null, "d_0"}, + new String[] {"hebei", "p_1", "d_1"}, + new String[] {"shandong", "p_1", null}); + List<IDeviceSchemaInfo> deviceSchemaInfoList = + SchemaRegionTestUtil.getTableDevice(schemaRegion, tableName, deviceIdList); + Assert.assertEquals(3, deviceSchemaInfoList.size()); + Assert.assertEquals( + deviceIdList.stream().map(Arrays::toString).sorted().collect(Collectors.toList()), + deviceSchemaInfoList.stream() + .map(o -> Arrays.copyOfRange(o.getRawNodes(), 3, o.getRawNodes().length)) + .map(Arrays::toString) + .sorted() + .collect(Collectors.toList())); + + deviceSchemaInfoList = + SchemaRegionTestUtil.getTableDevice( + schemaRegion, + tableName, + 3, + Collections.singletonList(new DeviceIdFilter(1, null)), + null); + Assert.assertEquals(1, deviceSchemaInfoList.size()); + + deviceSchemaInfoList = + SchemaRegionTestUtil.getTableDevice( + schemaRegion, + tableName, + 3, + Collections.emptyList(), + new OrFilter(new DeviceIdFilter(2, null), new DeviceAttributeFilter("cycle", null))); + Assert.assertEquals(3, deviceSchemaInfoList.size()); + } + + @Test + public void testDeviceWithDifferentIdLength() throws Exception { + if (!testParams.getTestModeName().equals("MemoryMode")) { + return; + } + ISchemaRegion schemaRegion = getSchemaRegion("root.db", 0); + String tableName = "t"; + + Map<String, String> attributeMap = new HashMap<>(); + attributeMap.put("type", "new"); + attributeMap.put("cycle", "monthly"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName, new String[] {"hebei", "p_1", "d_0"}, attributeMap); + attributeMap.put("type", "old"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName, new String[] {"hebei", "p_1", "d_1"}, attributeMap); + attributeMap.put("cycle", "daily"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName, new String[] {"shandong", "p_1", "d_1", "r_1"}, attributeMap); + + List<String[]> deviceIdList = + Arrays.asList( + new String[] {"hebei", "p_1", "d_0"}, + new String[] {"hebei", "p_1", "d_1"}, + new String[] {"shandong", "p_1", "d_1", "r_1"}); + List<IDeviceSchemaInfo> deviceSchemaInfoList = + SchemaRegionTestUtil.getTableDevice(schemaRegion, tableName, deviceIdList); + Assert.assertEquals(3, deviceSchemaInfoList.size()); + Assert.assertEquals( + deviceIdList.stream().map(Arrays::toString).sorted().collect(Collectors.toList()), + deviceSchemaInfoList.stream() + .map(o -> Arrays.copyOfRange(o.getRawNodes(), 3, o.getRawNodes().length)) + .map(Arrays::toString) + .sorted() + .collect(Collectors.toList())); + + deviceSchemaInfoList = + SchemaRegionTestUtil.getTableDevice( + schemaRegion, + tableName, + 4, + Collections.singletonList(new DeviceIdFilter(3, "r_1")), + null); + Assert.assertEquals(1, deviceSchemaInfoList.size()); + + // todo implement device query after table column extension + // deviceSchemaInfoList = + // SchemaRegionTestUtil.getTableDevice( + // schemaRegion, + // tableName, + // 4, + // Collections.singletonList(new DeviceIdFilter(3, null)), + // null); + // Assert.assertEquals(2, deviceSchemaInfoList.size()); + } + + @Test + public void testMultiTableDevice() throws Exception { + if (!testParams.getTestModeName().equals("MemoryMode")) { + return; + } + ISchemaRegion schemaRegion = getSchemaRegion("root.db", 0); + String tableName1 = "t1"; + + Map<String, String> attributeMap = new HashMap<>(); + attributeMap.put("type", "new"); + attributeMap.put("cycle", "monthly"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName1, new String[] {"hebei", "p_1", "d_0"}, attributeMap); + attributeMap.put("type", "old"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName1, new String[] {"hebei", "p_1", "d_1"}, attributeMap); + attributeMap.put("cycle", "daily"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName1, new String[] {"shandong", "p_1", "d_1", "r_1"}, attributeMap); + + String tableName2 = "t2"; + + attributeMap.put("type", "new"); + attributeMap.put("cycle", "monthly"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName2, new String[] {"hebei", "p_1", "d_0"}, attributeMap); + attributeMap.put("type", "old"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName2, new String[] {"hebei", "p_1", "d_1"}, attributeMap); + attributeMap.put("cycle", "daily"); + SchemaRegionTestUtil.createTableDevice( + schemaRegion, tableName2, new String[] {"shandong", "p_1", "d_1", "r_1"}, attributeMap); + + List<String[]> deviceIdList = + Arrays.asList( + new String[] {"hebei", "p_1", "d_0"}, + new String[] {"hebei", "p_1", "d_1"}, + new String[] {"shandong", "p_1", "d_1", "r_1"}); + List<IDeviceSchemaInfo> deviceSchemaInfoList = + SchemaRegionTestUtil.getTableDevice(schemaRegion, tableName1, deviceIdList); + Assert.assertEquals(3, deviceSchemaInfoList.size()); + + deviceSchemaInfoList = + SchemaRegionTestUtil.getTableDevice(schemaRegion, tableName2, deviceIdList); + Assert.assertEquals(3, deviceSchemaInfoList.size()); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTestUtil.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTestUtil.java index 53a567c1927..e5c6382a44d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTestUtil.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTestUtil.java @@ -24,8 +24,10 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.commons.schema.filter.SchemaFilterFactory; +import org.apache.iotdb.commons.schema.filter.impl.DeviceFilterUtil; import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion; import org.apache.iotdb.db.schemaengine.schemaregion.read.req.SchemaRegionReadPlanFactory; +import org.apache.iotdb.db.schemaengine.schemaregion.read.req.impl.ShowTableDevicesPlan; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.IDeviceSchemaInfo; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.INodeSchemaInfo; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.ITimeSeriesSchemaInfo; @@ -49,6 +51,7 @@ import java.util.Set; import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD; import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_MATCH_SCOPE; +import static org.apache.iotdb.commons.schema.SchemaConstant.ROOT; public class SchemaRegionTestUtil { @@ -377,4 +380,68 @@ public class SchemaRegionTestUtil { schemaRegion.deleteTimeseriesInBlackList(patternTree); return num; } + + public static void createTableDevice( + ISchemaRegion schemaRegion, String table, String[] deviceIds, Map<String, String> attributes) + throws MetadataException { + String[] fullId = new String[deviceIds.length + 3]; + fullId[0] = ROOT; + fullId[1] = schemaRegion.getDatabaseFullPath().substring(ROOT.length() + 1); + fullId[2] = table; + System.arraycopy(deviceIds, 0, fullId, 3, deviceIds.length); + schemaRegion.createTableDevice( + Collections.singletonList(new PartialPath(fullId)), + new ArrayList<>(attributes.keySet()), + Collections.singletonList(new ArrayList<>(attributes.values()))); + } + + public static List<IDeviceSchemaInfo> getTableDevice( + ISchemaRegion schemaRegion, String table, List<String[]> deviceIdList) + throws MetadataException { + List<PartialPath> pathList = new ArrayList<>(); + for (String[] deviceId : deviceIdList) { + String[] fullId = new String[deviceId.length + 3]; + fullId[0] = ROOT; + fullId[1] = schemaRegion.getDatabaseFullPath().substring(ROOT.length() + 1); + fullId[2] = table; + System.arraycopy(deviceId, 0, fullId, 3, deviceId.length); + pathList.add(new PartialPath(fullId)); + } + List<IDeviceSchemaInfo> result = new ArrayList<>(); + try (ISchemaReader<IDeviceSchemaInfo> reader = schemaRegion.getTableDeviceReader(pathList)) { + while (reader.hasNext()) { + result.add(reader.next()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return result; + } + + public static List<IDeviceSchemaInfo> getTableDevice( + ISchemaRegion schemaRegion, + String table, + int idColumnNum, + List<SchemaFilter> idDeterminedFilterList, + SchemaFilter idFuzzyFilter) + throws MetadataException { + List<PartialPath> patternList = + DeviceFilterUtil.convertToDevicePattern( + schemaRegion.getDatabaseFullPath().substring(ROOT.length() + 1), + table, + idColumnNum, + Collections.singletonList(idDeterminedFilterList)); + List<IDeviceSchemaInfo> result = new ArrayList<>(); + for (PartialPath pattern : patternList) { + try (ISchemaReader<IDeviceSchemaInfo> reader = + schemaRegion.getTableDeviceReader(new ShowTableDevicesPlan(pattern, idFuzzyFilter))) { + while (reader.hasNext()) { + result.add(reader.next()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return result; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java index 5aeebd32ef6..d5ba3c159bd 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java @@ -43,6 +43,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; @@ -713,7 +714,7 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable { return false; } else { for (int i = 0; i < this.nodes.length; i++) { - if (!nodes[i].equals(otherNodes[i])) { + if (!Objects.equals(nodes[i], otherNodes[i])) { return false; } } @@ -730,7 +731,7 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable { public int hashCode() { int h = 0; for (String node : nodes) { - h += 31 * h + node.hashCode(); + h += 31 * h + Objects.hashCode(node); } return h; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternUtil.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternUtil.java index a8ba920813b..6ee73645350 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternUtil.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternUtil.java @@ -33,7 +33,8 @@ public class PathPatternUtil { * patternNode that can match batch explicit node names. e.g. *, e.g. *, **, d*, *d*. */ public static boolean hasWildcard(String node) { - return node.startsWith(ONE_LEVEL_PATH_WILDCARD) || node.endsWith(ONE_LEVEL_PATH_WILDCARD); + return node != null + && (node.startsWith(ONE_LEVEL_PATH_WILDCARD) || node.endsWith(ONE_LEVEL_PATH_WILDCARD)); } public static boolean isMultiLevelMatchWildcard(String node) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/fa/nfa/SimpleNFA.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/fa/nfa/SimpleNFA.java index 698f27b37c8..4a68861af1d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/fa/nfa/SimpleNFA.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/fa/nfa/SimpleNFA.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.regex.Pattern; import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; @@ -123,6 +124,8 @@ public class SimpleNFA implements IPatternFA { if (patternNodes[nextIndex] == null) { if (nextIndex == rawNodes.length) { patternNodes[nextIndex] = new PrefixMatchNode(nextIndex, currentNode.getTracebackNode()); + } else if (rawNodes[nextIndex] == null) { + patternNodes[nextIndex] = new NameMatchNode(nextIndex, currentNode.getTracebackNode()); } else if (rawNodes[nextIndex].equals(MULTI_LEVEL_PATH_WILDCARD)) { patternNodes[nextIndex] = new MultiLevelWildcardMatchNode(nextIndex); } else if (rawNodes[nextIndex].equals(ONE_LEVEL_PATH_WILDCARD)) { @@ -439,7 +442,7 @@ public class SimpleNFA implements IPatternFA { @Override public boolean isMatch(String event) { - return rawNodes[patternIndex].equals(event); + return Objects.equals(rawNodes[patternIndex], event); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/MemUsageUtil.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/MemUsageUtil.java new file mode 100644 index 00000000000..530b9a790c2 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/MemUsageUtil.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.schema; + +public class MemUsageUtil { + + private MemUsageUtil() { + // do nothing + } + + public static long computeStringMemUsage(String value) { + return estimateStringSize(value); + } + + /** + * The basic memory occupied by any BasicMNode object. + * + * <ol> + * <li>MapEntry in parent + * <ol> + * <li>key reference, 8B + * <li>value reference, 8B + * <li>entry size, see ConcurrentHashMap.Node, 28 + * </ol> + * </ol> + */ + public static long computeKVMemUsageInMap(String key, String value) { + return 40L + estimateStringSize(key) + estimateStringSize(value); + } + + /** + * String basic total, 32B + * + * <ul> + * <li>Object header, 8B + * <li>char[] reference + header + length, 8 + 4 + 8= 20B + * <li>hash code, 4B + * </ul> + */ + private static long estimateStringSize(String string) { + // each char takes 2B in Java + return string == null ? 0 : 32 + 2 * string.length(); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/SchemaConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/SchemaConstant.java index 7c67375d31d..95e95c28af0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/SchemaConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/SchemaConstant.java @@ -56,6 +56,11 @@ public class SchemaConstant { public static final String TAG_LOG_SNAPSHOT_TMP = "tlog.txt.snapshot.tmp"; public static final String MTREE_SNAPSHOT = "mtree.snapshot"; public static final String MTREE_SNAPSHOT_TMP = "mtree.snapshot.tmp"; + + public static final String DEVICE_ATTRIBUTE_SNAPSHOT = "device_attribute.snapshot"; + + public static final String DEVICE_ATTRIBUTE_SNAPSHOT_TMP = "device_attribute.snapshot.tmp"; + public static final String SYSTEM_DATABASE = "root.__system"; public static final String[] ALL_RESULT_NODES = new String[] {"root", "**"}; public static final PartialPath ALL_MATCH_PATTERN = new PartialPath(ALL_RESULT_NODES); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilter.java index a9e5fea0b73..ad3c4679a39 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilter.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilter.java @@ -21,6 +21,9 @@ package org.apache.iotdb.commons.schema.filter; import org.apache.iotdb.commons.schema.filter.impl.AndFilter; import org.apache.iotdb.commons.schema.filter.impl.DataTypeFilter; +import org.apache.iotdb.commons.schema.filter.impl.DeviceAttributeFilter; +import org.apache.iotdb.commons.schema.filter.impl.DeviceIdFilter; +import org.apache.iotdb.commons.schema.filter.impl.OrFilter; import org.apache.iotdb.commons.schema.filter.impl.PathContainsFilter; import org.apache.iotdb.commons.schema.filter.impl.TagFilter; import org.apache.iotdb.commons.schema.filter.impl.TemplateFilter; @@ -73,6 +76,12 @@ public abstract class SchemaFilter { return new AndFilter(byteBuffer); case TEMPLATE_FILTER: return new TemplateFilter(byteBuffer); + case OR: + return new OrFilter(byteBuffer); + case DEVICE_ID: + return new DeviceIdFilter(byteBuffer); + case DEVICE_ATTRIBUTE: + return new DeviceAttributeFilter(byteBuffer); default: throw new IllegalArgumentException("Unsupported schema filter type: " + type); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilterType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilterType.java index 285130f778e..eb79511e5b6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilterType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilterType.java @@ -26,6 +26,10 @@ public enum SchemaFilterType { VIEW_TYPE((short) 4), AND((short) 5), TEMPLATE_FILTER((short) 6), + DEVICE_ID((short) 7), + DEVICE_ATTRIBUTE((short) 8), + + OR((short) 9), ; private final short code; @@ -54,6 +58,12 @@ public enum SchemaFilterType { return AND; case 6: return TEMPLATE_FILTER; + case 7: + return DEVICE_ID; + case 8: + return DEVICE_ATTRIBUTE; + case 9: + return OR; default: throw new IllegalArgumentException("Invalid input: " + code); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilterVisitor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilterVisitor.java index d6861baab68..f503da64b15 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilterVisitor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/SchemaFilterVisitor.java @@ -20,6 +20,9 @@ package org.apache.iotdb.commons.schema.filter; import org.apache.iotdb.commons.schema.filter.impl.AndFilter; import org.apache.iotdb.commons.schema.filter.impl.DataTypeFilter; +import org.apache.iotdb.commons.schema.filter.impl.DeviceAttributeFilter; +import org.apache.iotdb.commons.schema.filter.impl.DeviceIdFilter; +import org.apache.iotdb.commons.schema.filter.impl.OrFilter; import org.apache.iotdb.commons.schema.filter.impl.PathContainsFilter; import org.apache.iotdb.commons.schema.filter.impl.TagFilter; import org.apache.iotdb.commons.schema.filter.impl.TemplateFilter; @@ -69,4 +72,16 @@ public abstract class SchemaFilterVisitor<C> { public boolean visitAndFilter(AndFilter andFilter, C context) { return andFilter.getLeft().accept(this, context) && andFilter.getRight().accept(this, context); } + + public boolean visitOrFilter(OrFilter orFilter, C context) { + return orFilter.getLeft().accept(this, context) || orFilter.getRight().accept(this, context); + } + + public boolean visitDeviceIdFilter(DeviceIdFilter filter, C context) { + return visitFilter(filter, context); + } + + public boolean visitDeviceAttributeFilter(DeviceAttributeFilter filter, C context) { + return visitDeviceAttributeFilter(filter, context); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceAttributeFilter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceAttributeFilter.java new file mode 100644 index 00000000000..d85b437e9c4 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceAttributeFilter.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.commons.schema.filter.impl; + +import org.apache.iotdb.commons.schema.filter.SchemaFilter; +import org.apache.iotdb.commons.schema.filter.SchemaFilterType; +import org.apache.iotdb.commons.schema.filter.SchemaFilterVisitor; + +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class DeviceAttributeFilter extends SchemaFilter { + + private final String key; + + private final String value; + + public DeviceAttributeFilter(String key, String value) { + this.key = key; + this.value = value; + } + + public DeviceAttributeFilter(ByteBuffer byteBuffer) { + this.key = ReadWriteIOUtils.readString(byteBuffer); + this.value = ReadWriteIOUtils.readString(byteBuffer); + } + + public String getKey() { + return key; + } + + public String getValue() { + return value; + } + + @Override + public <C> boolean accept(SchemaFilterVisitor<C> visitor, C node) { + return visitor.visitDeviceAttributeFilter(this, node); + } + + @Override + public SchemaFilterType getSchemaFilterType() { + return SchemaFilterType.DEVICE_ATTRIBUTE; + } + + @Override + public void serialize(ByteBuffer byteBuffer) { + ReadWriteIOUtils.write(key, byteBuffer); + ReadWriteIOUtils.write(value, byteBuffer); + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + ReadWriteIOUtils.write(key, stream); + ReadWriteIOUtils.write(value, stream); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceFilterUtil.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceFilterUtil.java new file mode 100644 index 00000000000..1b43fe1feb8 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceFilterUtil.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.schema.filter.impl; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.schema.filter.SchemaFilter; +import org.apache.iotdb.commons.schema.filter.SchemaFilterType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD; +import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT; + +public class DeviceFilterUtil { + + private DeviceFilterUtil() { + // do nothing + } + + // if the element in idDeterminedFilterList isEmpty, the corresponding pattern is + // root.db.table.*.*.. + // e.g. input (db, table[c1, c2], [[]]), return [root.db.table.*.*] + public static List<PartialPath> convertToDevicePattern( + String database, + String tableName, + int idColumnNum, + List<List<SchemaFilter>> idDeterminedFilterList) { + List<PartialPath> pathList = new ArrayList<>(); + int length = idColumnNum + 3; + for (List<SchemaFilter> idFilterList : idDeterminedFilterList) { + String[] nodes = new String[length]; + Arrays.fill(nodes, ONE_LEVEL_PATH_WILDCARD); + nodes[0] = PATH_ROOT; + nodes[1] = database; + nodes[2] = tableName; + for (SchemaFilter schemaFilter : idFilterList) { + if (schemaFilter.getSchemaFilterType().equals(SchemaFilterType.DEVICE_ID)) { + DeviceIdFilter deviceIdFilter = (DeviceIdFilter) schemaFilter; + nodes[deviceIdFilter.getIndex() + 3] = deviceIdFilter.getValue(); + } else { + throw new IllegalStateException("Input single filter must be DeviceIdFilter"); + } + } + pathList.add(new PartialPath(nodes)); + } + + return pathList; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceIdFilter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceIdFilter.java new file mode 100644 index 00000000000..6ba5cd71f54 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/DeviceIdFilter.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.commons.schema.filter.impl; + +import org.apache.iotdb.commons.schema.filter.SchemaFilter; +import org.apache.iotdb.commons.schema.filter.SchemaFilterType; +import org.apache.iotdb.commons.schema.filter.SchemaFilterVisitor; + +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class DeviceIdFilter extends SchemaFilter { + + // id column index + // when used in partialPath, the index of node in path shall be [this.index + 3] + // since a partialPath start with {root, db, table} + private final int index; + + private final String value; + + public DeviceIdFilter(int index, String value) { + this.index = index; + this.value = value; + } + + public DeviceIdFilter(ByteBuffer byteBuffer) { + this.index = ReadWriteIOUtils.readInt(byteBuffer); + this.value = ReadWriteIOUtils.readString(byteBuffer); + } + + public int getIndex() { + return index; + } + + public String getValue() { + return value; + } + + @Override + public <C> boolean accept(SchemaFilterVisitor<C> visitor, C node) { + return visitor.visitDeviceIdFilter(this, node); + } + + @Override + public SchemaFilterType getSchemaFilterType() { + return SchemaFilterType.DEVICE_ID; + } + + @Override + public void serialize(ByteBuffer byteBuffer) { + ReadWriteIOUtils.write(index, byteBuffer); + ReadWriteIOUtils.write(value, byteBuffer); + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + ReadWriteIOUtils.write(index, stream); + ReadWriteIOUtils.write(value, stream); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/OrFilter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/OrFilter.java new file mode 100644 index 00000000000..e740c23b2c9 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/filter/impl/OrFilter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.schema.filter.impl; + +import org.apache.iotdb.commons.schema.filter.SchemaFilter; +import org.apache.iotdb.commons.schema.filter.SchemaFilterType; +import org.apache.iotdb.commons.schema.filter.SchemaFilterVisitor; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class OrFilter extends SchemaFilter { + private final SchemaFilter left; + private final SchemaFilter right; + + public OrFilter(SchemaFilter left, SchemaFilter right) { + // left and right should not be null + this.left = left; + this.right = right; + } + + public OrFilter(ByteBuffer byteBuffer) { + this.left = SchemaFilter.deserialize(byteBuffer); + this.right = SchemaFilter.deserialize(byteBuffer); + } + + public SchemaFilter getLeft() { + return left; + } + + public SchemaFilter getRight() { + return right; + } + + @Override + public <C> boolean accept(SchemaFilterVisitor<C> visitor, C node) { + return visitor.visitOrFilter(this, node); + } + + @Override + public SchemaFilterType getSchemaFilterType() { + return SchemaFilterType.OR; + } + + @Override + public void serialize(ByteBuffer byteBuffer) { + left.serialize(byteBuffer); + right.serialize(byteBuffer); + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + left.serialize(stream); + right.serialize(stream); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/tree/AbstractTreeVisitor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/tree/AbstractTreeVisitor.java index caa8019ef87..606d1b00daf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/tree/AbstractTreeVisitor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/tree/AbstractTreeVisitor.java @@ -121,7 +121,9 @@ public abstract class AbstractTreeVisitor<N extends ITreeNode, R> implements Sch boolean usingDFA = false; // Use DFA if there are ** and no regex node in pathPattern for (String pathNode : pathPattern.getNodes()) { - if (IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD.equals(pathNode)) { + if (pathNode == null) { + continue; + } else if (IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD.equals(pathNode)) { // ** node usingDFA = true; } else if (pathNode.length() > 1 && PathPatternUtil.hasWildcard(pathNode)) {