This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rel_0.12_debug_compaction_stop
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel_0.12_debug_compaction_stop
by this push:
new b7b5c4c fix finally
b7b5c4c is described below
commit b7b5c4c11ac5cc1a499b6cee2cf492f917393f56
Author: LebronAl <[email protected]>
AuthorDate: Thu Nov 11 19:48:38 2021 +0800
fix finally
---
.../iotdb/cluster/query/LocalQueryExecutor.java | 2 +-
.../iotdb/cluster/query/reader/DataSourceInfo.java | 5 +++--
.../query/reader/mult/MultDataSourceInfo.java | 5 +++--
.../java/org/apache/iotdb/udf/UDTFExample.java | 22 +++++++++++++++++-----
4 files changed, 24 insertions(+), 10 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index 1fefc79..33a14e5 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -504,7 +504,7 @@ public class LocalQueryExecutor {
readerId);
return readerId;
} else {
- logger.debug(
+ logger.warn(
"{}: There is no data {} for {}#{}",
name,
path,
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
index 4ba11e4..e609816 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
@@ -97,8 +97,6 @@ public class DataSourceInfo {
if (newReaderId != null) {
logger.debug("get a readerId {} for {} from {}", newReaderId,
request.path, node);
if (newReaderId != -1) {
- // register the node so the remote resources can be released
- context.registerRemoteNode(node, partitionGroup.getHeader());
this.readerId = newReaderId;
this.curSource = node;
this.curPos = nextNodePos;
@@ -116,6 +114,9 @@ public class DataSourceInfo {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Cannot query {} from {}", this.request.path, node, e);
+ } finally {
+ // register the node so the remote resources can be released
+ context.registerRemoteNode(node, partitionGroup.getHeader());
}
nextNodePos = (nextNodePos + 1) % this.nodes.size();
if (nextNodePos == this.curPos) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
index a4488aa..0b35a4c 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
@@ -102,8 +102,6 @@ public class MultDataSourceInfo {
if (newReaderId != null) {
logger.debug("get a readerId {} for {} from {}", newReaderId,
request.path, node);
if (newReaderId != -1) {
- // register the node so the remote resources can be released
- context.registerRemoteNode(node, partitionGroup.getHeader());
this.readerId = newReaderId;
this.curSource = node;
this.curPos = nextNodePos;
@@ -121,6 +119,9 @@ public class MultDataSourceInfo {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Cannot query {} from {}", this.request.path, node, e);
+ } finally {
+ // register the node so the remote resources can be released
+ context.registerRemoteNode(node, partitionGroup.getHeader());
}
nextNodePos = (nextNodePos + 1) % this.nodes.size();
if (nextNodePos == this.curPos) {
diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/UDTFExample.java
b/example/udf/src/main/java/org/apache/iotdb/udf/UDTFExample.java
index 17a534d..b6a2c69 100644
--- a/example/udf/src/main/java/org/apache/iotdb/udf/UDTFExample.java
+++ b/example/udf/src/main/java/org/apache/iotdb/udf/UDTFExample.java
@@ -19,15 +19,17 @@
package org.apache.iotdb.udf;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.udf.api.UDTF;
-import org.apache.iotdb.db.query.udf.api.access.Row;
import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
import
org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Set;
/** This is an internal example of the UDTF implementation. */
public class UDTFExample implements UDTF {
@@ -47,11 +49,21 @@ public class UDTFExample implements UDTF {
public void beforeStart(UDFParameters parameters, UDTFConfigurations
configurations) {
configurations
.setAccessStrategy(new RowByRowAccessStrategy())
- .setOutputDataType(TSDataType.INT32);
+ .setOutputDataType(TSDataType.TEXT);
}
@Override
- public void transform(Row row, PointCollector collector) throws IOException {
- collector.putInt(row.getTime(), -row.getInt(0));
+ public void terminate(PointCollector collector) throws Exception {
+ int i = 0;
+ FileReaderManager manager = FileReaderManager.getInstance();
+ Class managerClass = manager.getClass();
+ Field readerMapField = managerClass.getDeclaredField("closedReferenceMap");
+ readerMapField.setAccessible(true);
+ Map<String, Set<Long>> readerMap = (Map) readerMapField.get(manager);
+ for (Map.Entry<String, Set<Long>> entry : readerMap.entrySet()) {
+ if (entry.getValue().size() != 0) {
+ collector.putString(i++, entry.getKey() + " _ " +
entry.getValue().toString());
+ }
+ }
}
}