This is an automated email from the ASF dual-hosted git repository.

spricoder pushed a commit to branch object_type_tiff
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/object_type_tiff by this push:
     new 55723c736a2 更新优化版
55723c736a2 is described below

commit 55723c736a2b6b136a3699adf34936d0993ed49a
Author: spricoder <[email protected]>
AuthorDate: Sun Sep 7 15:14:52 2025 +0800

    更新优化版
---
 .../rpc/model/CompressedTiffModelProcessor.java    | 106 +++++++----
 .../db/utils/model/CompressedTiffModelReader.java  | 209 ++++++++++++++++++---
 2 files changed, 255 insertions(+), 60 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTiffModelProcessor.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTiffModelProcessor.java
index 11c937e1160..e6f351d5bcf 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTiffModelProcessor.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTiffModelProcessor.java
@@ -26,19 +26,25 @@ import org.gdal.gdal.Driver;
 import org.gdal.gdal.gdal;
 import org.gdal.gdalconst.gdalconstConstants;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class CompressedTiffModelProcessor extends ModelProcessor {
   private static final Driver DRIVER;
   private static final String VIRTUAL_FILE_PATH_PREFIX = "/vsimem";
   private static final String VIRTUAL_FILE_PATH_SUFFIX = ".tif";
-  // Specifying compression options
-  private static String compressOption = "COMPRESS=LZW";
-  // Specifying block x size options
-  private static String blockXSize = "BLOCKXSIZE=256";
-  // Specifying block y size options
-  private static String blockYSize = "BLOCKYSIZE=256";
+  // ---- 写入压缩配置(核心配置)----
+  private static final String COMPRESS = "ZSTD"; // 备选:"DEFLATE" / "LZW"
+  private static final int ZSTD_LEVEL = 3; // 仅 ZSTD 有效,可调 1~22
+  private static final String PREDICTOR = "2"; // 浮点差分
+  private static final int ROWS_PER_STRIP = 12; // 行随机读最佳;若 strip 过多可设 8/16/32
+
+  // 固定哨兵值作为 NoData(兼容面好于直接写 NaN)
+  private static final float NODATA_SENTINEL = -3.4028235e38f;
 
   static {
     gdal.AllRegister();
+    gdal.SetConfigOption("GDAL_NUM_THREADS", "ALL_CPUS");
     DRIVER = gdal.GetDriverByName("GTiff");
     if (DRIVER == null) {
       throw new RuntimeException("Failed to get GTiff driver: " + 
gdal.GetLastErrorMsg());
@@ -56,31 +62,51 @@ public class CompressedTiffModelProcessor extends 
ModelProcessor {
   }
 
   private byte[] write(String filePath, float[] values, int width, int height) 
{
-    // floating point data should use predictor 2 (for difference prediction), 
and use block storage
-    // (recommended for LZW)
-    String[] options =
-        new String[] {compressOption, "PREDICTOR=2", "TILED=YES", blockXSize, 
blockYSize};
+    if (values == null || values.length != (long) width * height) {
+      throw new IllegalArgumentException("values length must be width*height");
+    }
 
-    Dataset dataset = null;
-    try {
-      // Create dataset with specified options
-      dataset = DRIVER.Create(filePath, width, height, 1, 
gdalconstConstants.GDT_Float32, options);
+    List<String> opts = new ArrayList<>();
+    opts.add("BIGTIFF=IF_SAFER");
+    opts.add("TILED=NO"); // 明确 strip 组织
+    opts.add("BLOCKYSIZE=" + ROWS_PER_STRIP); // rows-per-strip
+    opts.add("PREDICTOR=" + PREDICTOR);
+    opts.add("NUM_THREADS=ALL_CPUS");
+    if ("ZSTD".equalsIgnoreCase(COMPRESS)) {
+      opts.add("COMPRESS=ZSTD");
+      opts.add("ZSTD_LEVEL=" + ZSTD_LEVEL);
+    } else if ("DEFLATE".equalsIgnoreCase(COMPRESS)) {
+      opts.add("COMPRESS=DEFLATE");
+    } else if ("LZW".equalsIgnoreCase(COMPRESS)) {
+      opts.add("COMPRESS=LZW");
+    } else {
+      throw new IllegalArgumentException("Unsupported COMPRESS=" + COMPRESS);
+    }
+    String[] options = opts.toArray(new String[0]);
 
-      if (dataset == null) {
+    Dataset ds = null;
+    try {
+      ds = DRIVER.Create(filePath, width, height, 1, 
gdalconstConstants.GDT_Float32, options);
+      if (ds == null) {
         throw new RuntimeException("Failed to create dataset: " + 
gdal.GetLastErrorMsg());
       }
+      Band band = ds.GetRasterBand(1);
+
+      // 统一设置 NoData(固定哨兵值)
+      band.SetNoDataValue(NODATA_SENTINEL);
 
-      Band band = dataset.GetRasterBand(1);
-      int result = band.WriteRaster(0, 0, width, height, values);
-      if (result != gdalconstConstants.CE_None) {
-        throw new RuntimeException("Failed to write data to tiff file: " + 
gdal.GetLastErrorMsg());
+      // 顺序写整幅数据(strip 组织下吞吐较好)
+      int err = band.WriteRaster(0, 0, width, height, values);
+      if (err != gdalconstConstants.CE_None) {
+        throw new RuntimeException("Failed to write data: " + 
gdal.GetLastErrorMsg());
       }
+
       band.FlushCache();
-      dataset.FlushCache();
+      ds.FlushCache();
       return VsiGdalNative.vsiGetMemFileBuffer(filePath, true);
     } finally {
-      if (dataset != null) {
-        dataset.delete();
+      if (ds != null) {
+        ds.delete();
       }
     }
   }
@@ -98,23 +124,33 @@ public class CompressedTiffModelProcessor extends 
ModelProcessor {
 
   @Override
   public float[] readAll(String filePath) {
-    Dataset dataset = gdal.Open(filePath, gdalconstConstants.GA_ReadOnly);
-    if (dataset == null) {
-      throw new RuntimeException("Failed to open tiff file: " + 
gdal.GetLastErrorMsg());
+    Dataset ds = gdal.OpenShared(filePath, gdalconstConstants.GA_ReadOnly);
+    if (ds == null) {
+      throw new RuntimeException("Failed to open: " + gdal.GetLastErrorMsg());
     }
     try {
-      Band band = dataset.GetRasterBand(1);
-      if (band == null) {
-        throw new RuntimeException(
-            "Failed to get raster band from dataset" + gdal.GetLastErrorMsg());
+      Band band = ds.GetRasterBand(1);
+      int w = band.getXSize(), h = band.getYSize();
+      float[] out = new float[(int) ((long) w * h)];
+      int err = band.ReadRaster(0, 0, w, h, gdalconstConstants.GDT_Float32, 
out);
+      if (err != gdalconstConstants.CE_None) {
+        throw new RuntimeException("ReadRaster(all) failed: " + 
gdal.GetLastErrorMsg());
+      }
+
+      // NoData -> NaN
+      Double[] nd = new Double[1];
+      band.GetNoDataValue(nd);
+      float nodata = (nd[0] == null || Double.isNaN(nd[0])) ? Float.NaN : 
nd[0].floatValue();
+      if (!Float.isNaN(nodata)) {
+        for (int i = 0; i < out.length; i++) {
+          if (out[i] == nodata) {
+            out[i] = Float.NaN;
+          }
+        }
       }
-      int width = band.getXSize();
-      int height = band.getYSize();
-      float[] result = new float[width * height];
-      band.ReadRaster(0, 0, width, height, gdalconstConstants.GDT_Float32, 
result);
-      return result;
+      return out;
     } finally {
-      dataset.delete();
+      ds.delete();
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTiffModelReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTiffModelReader.java
index 19ede8e44bf..9cbd42b130f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTiffModelReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTiffModelReader.java
@@ -27,48 +27,207 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 public class CompressedTiffModelReader extends ModelReader {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(CompressedTiffModelReader.class);
 
   static {
     gdal.AllRegister();
-    int cacheMax = gdal.GetCacheMax();
-    LOGGER.info("GDAL Cache Max: {}", cacheMax);
-    gdal.SetCacheMax(cacheMax * 2);
+    gdal.SetConfigOption("GDAL_NUM_THREADS", "ALL_CPUS");
+  }
+
+  // 线程本地复用的读缓冲,避免频繁分配
+  private static final ThreadLocal<float[]> TL_SCRATCH =
+      ThreadLocal.withInitial(() -> new float[0]);
+
+  private static float[] ensureCapacity(float[] buf, int need) {
+    if (buf.length >= need) return buf;
+    int cap = Math.max(buf.length * 2, need);
+    float[] n = new float[cap];
+    TL_SCRATCH.set(n);
+    return n;
+  }
+
+  /** 合并相邻/重叠区间;mergeGap=0 表示仅相邻或重叠才合并 */
+  private static List<int[]> mergeRanges(List<int[]> ranges, int mergeGap) {
+    if (ranges.isEmpty()) return ranges;
+    ranges.sort(Comparator.comparingInt(a -> a[0]));
+    List<int[]> out = new ArrayList<>();
+    int s = ranges.get(0)[0], e = ranges.get(0)[1];
+    for (int i = 1; i < ranges.size(); i++) {
+      int ns = ranges.get(i)[0], ne = ranges.get(i)[1];
+      if (ns <= e + 1 + mergeGap) {
+        e = Math.max(e, ne);
+      } else {
+        out.add(new int[] {s, e});
+        s = ns;
+        e = ne;
+      }
+    }
+    out.add(new int[] {s, e});
+    return out;
   }
 
   @Override
   public List<float[]> penetrate(String filePath, List<List<Integer>> 
startAndEndTimeArray) {
-    Dataset dataset = gdal.Open(filePath, gdalconstConstants.GA_ReadOnly);
-    if (dataset == null) {
-      LOGGER.error("Failed to open tiff file: {}", gdal.GetLastErrorMsg());
-      throw new RuntimeException("Failed to open tiff file: " + 
gdal.GetLastErrorMsg());
+    if (startAndEndTimeArray == null || startAndEndTimeArray.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    // 为保证返回顺序与输入一致:先把输入解析成条目列表(带原始 index)
+    class Req {
+      final int idx; // 原始顺序
+      final int startPix; // 像元索引(0..total-1)
+      final int endPix; // 像元索引(>=startPix)
+      int row, col0, col1; // 解析后的行号与列范围
+
+      Req(int idx, int s, int e) {
+        this.idx = idx;
+        this.startPix = Math.min(s, e);
+        this.endPix = Math.max(s, e);
+      }
     }
+    List<Req> reqs = new ArrayList<>(startAndEndTimeArray.size());
+    for (int i = 0; i < startAndEndTimeArray.size(); i++) {
+      List<Integer> r = startAndEndTimeArray.get(i);
+      if (r == null || r.size() < 2) {
+        throw new IllegalArgumentException("Each range must be [start, end].");
+      }
+      reqs.add(new Req(i, r.get(0), r.get(1)));
+    }
+
+    Dataset ds = gdal.OpenShared(filePath, gdalconstConstants.GA_ReadOnly);
+    if (ds == null) {
+      throw new RuntimeException("Failed to open: " + gdal.GetLastErrorMsg());
+    }
+
     try {
-      Band band = dataset.GetRasterBand(1);
-      if (band == null) {
-        LOGGER.error("Failed to get raster band from dataset: {}", 
gdal.GetLastErrorMsg());
-        throw new RuntimeException(
-            "Failed to get raster band from dataset" + gdal.GetLastErrorMsg());
+      Band band = ds.GetRasterBand(1);
+      final int width = band.getXSize();
+      final int height = band.getYSize();
+      final long total = (long) width * (long) height;
+
+      // 解析行/列 & 校验仅同一行
+      for (Req q : reqs) {
+        if (q.startPix < 0 || (long) q.endPix >= total) {
+          throw new IndexOutOfBoundsException(
+              String.format("Range [%d,%d] out of bounds [0,%d).", q.startPix, 
q.endPix, total));
+        }
+        int sRow = q.startPix / width, sCol = q.startPix % width;
+        int eRow = q.endPix / width, eCol = q.endPix % width;
+        if (sRow != eRow) {
+          throw new IllegalArgumentException(
+              "Range crosses rows: [" + q.startPix + "," + q.endPix + "]");
+        }
+        q.row = sRow;
+        q.col0 = sCol;
+        q.col1 = eCol;
+      }
+
+      // NoData 配置
+      Double[] nd = new Double[1];
+      band.GetNoDataValue(nd);
+      final boolean needMapNoData = nd[0] != null && !Double.isNaN(nd[0]);
+      final float nodata = needMapNoData ? nd[0].floatValue() : Float.NaN;
+
+      // 行 -> (合并前的列区间列表)
+      Map<Integer, List<int[]>> perRow = new LinkedHashMap<>();
+      for (Req q : reqs) {
+        perRow.computeIfAbsent(q.row, k -> new ArrayList<>()).add(new int[] 
{q.col0, q.col1});
+      }
+
+      // 为每个请求预先分配目标数组,最后按 idx 顺序收集
+      float[][] outputs = new float[reqs.size()][];
+      for (Req q : reqs) {
+        outputs[q.idx] = new float[q.col1 - q.col0 + 1];
+      }
+
+      // mergeGap 可按需要调大,如 4/8(允许读取少量“间隙像元”换更少的 ReadRaster 次数)
+      final int mergeGap = 0;
+
+      // 行内再建立“原始区间列表”(保持输入顺序),用于把窗口数据拆回去
+      Map<Integer, List<Req>> rowReqs = new LinkedHashMap<>();
+      for (Req q : reqs) rowReqs.computeIfAbsent(q.row, k -> new 
ArrayList<>()).add(q);
+
+      // 逐行处理
+      for (Map.Entry<Integer, List<int[]>> e : perRow.entrySet()) {
+        int row = e.getKey();
+        List<int[]> ranges = e.getValue();
+
+        // 合并区间 -> 更少的读取窗口
+        List<int[]> merged = mergeRanges(ranges, mergeGap);
+
+        // 按起点排序,便于窗口覆盖
+        List<Req> rowList = rowReqs.get(row);
+        rowList.sort(Comparator.comparingInt(a -> a.col0));
+        int p = 0;
+
+        for (int[] win : merged) {
+          int c0 = win[0], c1 = win[1];
+          int winLen = c1 - c0 + 1;
+
+          float[] scratch = ensureCapacity(TL_SCRATCH.get(), winLen);
+          int err = band.ReadRaster(c0, row, winLen, 1, 
gdalconstConstants.GDT_Float32, scratch);
+          if (err != gdalconstConstants.CE_None) {
+            throw new RuntimeException(
+                "ReadRaster(row="
+                    + row
+                    + ", "
+                    + c0
+                    + ":"
+                    + c1
+                    + ") failed: "
+                    + gdal.GetLastErrorMsg());
+          }
+
+          // 把窗口分发给所有落在其中的原始区间
+          while (p < rowList.size()) {
+            Req q = rowList.get(p);
+            if (q.col1 < c0) {
+              p++;
+              continue;
+            } // 在窗口左侧,跳过
+            if (q.col0 > c1) {
+              break;
+            } // 窗口右侧,进入下一个窗口
+
+            int from = Math.max(q.col0, c0);
+            int to = Math.min(q.col1, c1);
+            int len = to - from + 1;
+
+            float[] dst = outputs[q.idx];
+            System.arraycopy(scratch, from - c0, dst, from - q.col0, len);
+
+            // 注意:若 mergeGap>0,极端情况下一个 req 可能跨两个合并窗口;
+            // 这里不 p++,而是仅当整个 req 覆盖完才前移指针
+            if (to == q.col1) {
+              p++;
+            } // 该 req 已经完全覆盖
+            else {
+              break;
+            } // 仍有剩余,等待下一窗口补齐
+          }
+        }
       }
-      int width = band.getXSize();
-      List<float[]> result = new ArrayList<>();
-      for (List<Integer> startAndEndTime : startAndEndTimeArray) {
-        int xIndex = startAndEndTime.get(0);
-        int yIndex = startAndEndTime.get(1);
-        float[] tmp = new float[yIndex - xIndex + 1];
-        int xOff = xIndex % width;
-        int yOff = xIndex / width;
-        int xSize = yIndex - xIndex + 1;
-        int ySize = 1;
-        band.ReadRaster(xOff, yOff, xSize, ySize, 
gdalconstConstants.GDT_Float32, tmp);
-        result.add(tmp);
+
+      // NoData -> NaN
+      if (needMapNoData) {
+        for (float[] arr : outputs) {
+          for (int i = 0; i < arr.length; i++) if (arr[i] == nodata) arr[i] = 
Float.NaN;
+        }
       }
+
+      // 按原始顺序返回
+      List<float[]> result = new ArrayList<>(outputs.length);
+      for (int i = 0; i < outputs.length; i++) result.add(outputs[i]);
       return result;
     } finally {
-      dataset.delete();
+      ds.delete();
     }
   }
 }

Reply via email to