This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rel_0.12_debug_compaction_stop
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel_0.12_debug_compaction_stop 
by this push:
     new b7b5c4c  fix finally
b7b5c4c is described below

commit b7b5c4c11ac5cc1a499b6cee2cf492f917393f56
Author: LebronAl <[email protected]>
AuthorDate: Thu Nov 11 19:48:38 2021 +0800

    fix finally
---
 .../iotdb/cluster/query/LocalQueryExecutor.java    |  2 +-
 .../iotdb/cluster/query/reader/DataSourceInfo.java |  5 +++--
 .../query/reader/mult/MultDataSourceInfo.java      |  5 +++--
 .../java/org/apache/iotdb/udf/UDTFExample.java     | 22 +++++++++++++++++-----
 4 files changed, 24 insertions(+), 10 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index 1fefc79..33a14e5 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -504,7 +504,7 @@ public class LocalQueryExecutor {
           readerId);
       return readerId;
     } else {
-      logger.debug(
+      logger.warn(
           "{}: There is no data {} for {}#{}",
           name,
           path,
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
index 4ba11e4..e609816 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
@@ -97,8 +97,6 @@ public class DataSourceInfo {
         if (newReaderId != null) {
           logger.debug("get a readerId {} for {} from {}", newReaderId, 
request.path, node);
           if (newReaderId != -1) {
-            // register the node so the remote resources can be released
-            context.registerRemoteNode(node, partitionGroup.getHeader());
             this.readerId = newReaderId;
             this.curSource = node;
             this.curPos = nextNodePos;
@@ -116,6 +114,9 @@ public class DataSourceInfo {
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         logger.error("Cannot query {} from {}", this.request.path, node, e);
+      } finally {
+        // register the node so the remote resources can be released
+        context.registerRemoteNode(node, partitionGroup.getHeader());
       }
       nextNodePos = (nextNodePos + 1) % this.nodes.size();
       if (nextNodePos == this.curPos) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
index a4488aa..0b35a4c 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
@@ -102,8 +102,6 @@ public class MultDataSourceInfo {
         if (newReaderId != null) {
           logger.debug("get a readerId {} for {} from {}", newReaderId, 
request.path, node);
           if (newReaderId != -1) {
-            // register the node so the remote resources can be released
-            context.registerRemoteNode(node, partitionGroup.getHeader());
             this.readerId = newReaderId;
             this.curSource = node;
             this.curPos = nextNodePos;
@@ -121,6 +119,9 @@ public class MultDataSourceInfo {
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         logger.error("Cannot query {} from {}", this.request.path, node, e);
+      } finally {
+        // register the node so the remote resources can be released
+        context.registerRemoteNode(node, partitionGroup.getHeader());
       }
       nextNodePos = (nextNodePos + 1) % this.nodes.size();
       if (nextNodePos == this.curPos) {
diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/UDTFExample.java 
b/example/udf/src/main/java/org/apache/iotdb/udf/UDTFExample.java
index 17a534d..b6a2c69 100644
--- a/example/udf/src/main/java/org/apache/iotdb/udf/UDTFExample.java
+++ b/example/udf/src/main/java/org/apache/iotdb/udf/UDTFExample.java
@@ -19,15 +19,17 @@
 
 package org.apache.iotdb.udf;
 
+import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.udf.api.UDTF;
-import org.apache.iotdb.db.query.udf.api.access.Row;
 import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
 import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
 import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
 import 
org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
-import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Set;
 
 /** This is an internal example of the UDTF implementation. */
 public class UDTFExample implements UDTF {
@@ -47,11 +49,21 @@ public class UDTFExample implements UDTF {
   public void beforeStart(UDFParameters parameters, UDTFConfigurations 
configurations) {
     configurations
         .setAccessStrategy(new RowByRowAccessStrategy())
-        .setOutputDataType(TSDataType.INT32);
+        .setOutputDataType(TSDataType.TEXT);
   }
 
   @Override
-  public void transform(Row row, PointCollector collector) throws IOException {
-    collector.putInt(row.getTime(), -row.getInt(0));
+  public void terminate(PointCollector collector) throws Exception {
+    int i = 0;
+    FileReaderManager manager = FileReaderManager.getInstance();
+    Class managerClass = manager.getClass();
+    Field readerMapField = managerClass.getDeclaredField("closedReferenceMap");
+    readerMapField.setAccessible(true);
+    Map<String, Set<Long>> readerMap = (Map) readerMapField.get(manager);
+    for (Map.Entry<String, Set<Long>> entry : readerMap.entrySet()) {
+      if (entry.getValue().size() != 0) {
+        collector.putString(i++, entry.getKey() + " _ " + 
entry.getValue().toString());
+      }
+    }
   }
 }

Reply via email to