This is an automated email from the ASF dual-hosted git repository.
spricoder pushed a commit to branch object_type_tiff
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/object_type_tiff by this push:
new 55723c736a2 更新优化版
55723c736a2 is described below
commit 55723c736a2b6b136a3699adf34936d0993ed49a
Author: spricoder <[email protected]>
AuthorDate: Sun Sep 7 15:14:52 2025 +0800
更新优化版
---
.../rpc/model/CompressedTiffModelProcessor.java | 106 +++++++----
.../db/utils/model/CompressedTiffModelReader.java | 209 ++++++++++++++++++---
2 files changed, 255 insertions(+), 60 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTiffModelProcessor.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTiffModelProcessor.java
index 11c937e1160..e6f351d5bcf 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTiffModelProcessor.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTiffModelProcessor.java
@@ -26,19 +26,25 @@ import org.gdal.gdal.Driver;
import org.gdal.gdal.gdal;
import org.gdal.gdalconst.gdalconstConstants;
+import java.util.ArrayList;
+import java.util.List;
+
public class CompressedTiffModelProcessor extends ModelProcessor {
private static final Driver DRIVER;
private static final String VIRTUAL_FILE_PATH_PREFIX = "/vsimem";
private static final String VIRTUAL_FILE_PATH_SUFFIX = ".tif";
- // Specifying compression options
- private static String compressOption = "COMPRESS=LZW";
- // Specifying block x size options
- private static String blockXSize = "BLOCKXSIZE=256";
- // Specifying block y size options
- private static String blockYSize = "BLOCKYSIZE=256";
+ // ---- 写入压缩配置(核心配置)----
+ private static final String COMPRESS = "ZSTD"; // 备选:"DEFLATE" / "LZW"
+ private static final int ZSTD_LEVEL = 3; // 仅 ZSTD 有效,可调 1~22
+ private static final String PREDICTOR = "2"; // 浮点差分
+ private static final int ROWS_PER_STRIP = 12; // 行随机读最佳;若 strip 过多可设 8/16/32
+
+ // 固定哨兵值作为 NoData(兼容面好于直接写 NaN)
+ private static final float NODATA_SENTINEL = -3.4028235e38f;
static {
gdal.AllRegister();
+ gdal.SetConfigOption("GDAL_NUM_THREADS", "ALL_CPUS");
DRIVER = gdal.GetDriverByName("GTiff");
if (DRIVER == null) {
throw new RuntimeException("Failed to get GTiff driver: " +
gdal.GetLastErrorMsg());
@@ -56,31 +62,51 @@ public class CompressedTiffModelProcessor extends
ModelProcessor {
}
private byte[] write(String filePath, float[] values, int width, int height)
{
- // floating point data should use predictor 2 (for difference prediction),
and use block storage
- // (recommended for LZW)
- String[] options =
- new String[] {compressOption, "PREDICTOR=2", "TILED=YES", blockXSize,
blockYSize};
+ if (values == null || values.length != (long) width * height) {
+ throw new IllegalArgumentException("values length must be width*height");
+ }
- Dataset dataset = null;
- try {
- // Create dataset with specified options
- dataset = DRIVER.Create(filePath, width, height, 1,
gdalconstConstants.GDT_Float32, options);
+ List<String> opts = new ArrayList<>();
+ opts.add("BIGTIFF=IF_SAFER");
+ opts.add("TILED=NO"); // 明确 strip 组织
+ opts.add("BLOCKYSIZE=" + ROWS_PER_STRIP); // rows-per-strip
+ opts.add("PREDICTOR=" + PREDICTOR);
+ opts.add("NUM_THREADS=ALL_CPUS");
+ if ("ZSTD".equalsIgnoreCase(COMPRESS)) {
+ opts.add("COMPRESS=ZSTD");
+ opts.add("ZSTD_LEVEL=" + ZSTD_LEVEL);
+ } else if ("DEFLATE".equalsIgnoreCase(COMPRESS)) {
+ opts.add("COMPRESS=DEFLATE");
+ } else if ("LZW".equalsIgnoreCase(COMPRESS)) {
+ opts.add("COMPRESS=LZW");
+ } else {
+ throw new IllegalArgumentException("Unsupported COMPRESS=" + COMPRESS);
+ }
+ String[] options = opts.toArray(new String[0]);
- if (dataset == null) {
+ Dataset ds = null;
+ try {
+ ds = DRIVER.Create(filePath, width, height, 1,
gdalconstConstants.GDT_Float32, options);
+ if (ds == null) {
throw new RuntimeException("Failed to create dataset: " +
gdal.GetLastErrorMsg());
}
+ Band band = ds.GetRasterBand(1);
+
+ // 统一设置 NoData(固定哨兵值)
+ band.SetNoDataValue(NODATA_SENTINEL);
- Band band = dataset.GetRasterBand(1);
- int result = band.WriteRaster(0, 0, width, height, values);
- if (result != gdalconstConstants.CE_None) {
- throw new RuntimeException("Failed to write data to tiff file: " +
gdal.GetLastErrorMsg());
+ // 顺序写整幅数据(strip 组织下吞吐较好)
+ int err = band.WriteRaster(0, 0, width, height, values);
+ if (err != gdalconstConstants.CE_None) {
+ throw new RuntimeException("Failed to write data: " +
gdal.GetLastErrorMsg());
}
+
band.FlushCache();
- dataset.FlushCache();
+ ds.FlushCache();
return VsiGdalNative.vsiGetMemFileBuffer(filePath, true);
} finally {
- if (dataset != null) {
- dataset.delete();
+ if (ds != null) {
+ ds.delete();
}
}
}
@@ -98,23 +124,33 @@ public class CompressedTiffModelProcessor extends
ModelProcessor {
@Override
public float[] readAll(String filePath) {
- Dataset dataset = gdal.Open(filePath, gdalconstConstants.GA_ReadOnly);
- if (dataset == null) {
- throw new RuntimeException("Failed to open tiff file: " +
gdal.GetLastErrorMsg());
+ Dataset ds = gdal.OpenShared(filePath, gdalconstConstants.GA_ReadOnly);
+ if (ds == null) {
+ throw new RuntimeException("Failed to open: " + gdal.GetLastErrorMsg());
}
try {
- Band band = dataset.GetRasterBand(1);
- if (band == null) {
- throw new RuntimeException(
- "Failed to get raster band from dataset" + gdal.GetLastErrorMsg());
+ Band band = ds.GetRasterBand(1);
+ int w = band.getXSize(), h = band.getYSize();
+ float[] out = new float[(int) ((long) w * h)];
+ int err = band.ReadRaster(0, 0, w, h, gdalconstConstants.GDT_Float32,
out);
+ if (err != gdalconstConstants.CE_None) {
+ throw new RuntimeException("ReadRaster(all) failed: " +
gdal.GetLastErrorMsg());
+ }
+
+ // NoData -> NaN
+ Double[] nd = new Double[1];
+ band.GetNoDataValue(nd);
+ float nodata = (nd[0] == null || Double.isNaN(nd[0])) ? Float.NaN :
nd[0].floatValue();
+ if (!Float.isNaN(nodata)) {
+ for (int i = 0; i < out.length; i++) {
+ if (out[i] == nodata) {
+ out[i] = Float.NaN;
+ }
+ }
}
- int width = band.getXSize();
- int height = band.getYSize();
- float[] result = new float[width * height];
- band.ReadRaster(0, 0, width, height, gdalconstConstants.GDT_Float32,
result);
- return result;
+ return out;
} finally {
- dataset.delete();
+ ds.delete();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTiffModelReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTiffModelReader.java
index 19ede8e44bf..9cbd42b130f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTiffModelReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTiffModelReader.java
@@ -27,48 +27,207 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
public class CompressedTiffModelReader extends ModelReader {
private static final Logger LOGGER =
LoggerFactory.getLogger(CompressedTiffModelReader.class);
static {
gdal.AllRegister();
- int cacheMax = gdal.GetCacheMax();
- LOGGER.info("GDAL Cache Max: {}", cacheMax);
- gdal.SetCacheMax(cacheMax * 2);
+ gdal.SetConfigOption("GDAL_NUM_THREADS", "ALL_CPUS");
+ }
+
+ // 线程本地复用的读缓冲,避免频繁分配
+ private static final ThreadLocal<float[]> TL_SCRATCH =
+ ThreadLocal.withInitial(() -> new float[0]);
+
+ private static float[] ensureCapacity(float[] buf, int need) {
+ if (buf.length >= need) return buf;
+ int cap = Math.max(buf.length * 2, need);
+ float[] n = new float[cap];
+ TL_SCRATCH.set(n);
+ return n;
+ }
+
+ /** 合并相邻/重叠区间;mergeGap=0 表示仅相邻或重叠才合并 */
+ private static List<int[]> mergeRanges(List<int[]> ranges, int mergeGap) {
+ if (ranges.isEmpty()) return ranges;
+ ranges.sort(Comparator.comparingInt(a -> a[0]));
+ List<int[]> out = new ArrayList<>();
+ int s = ranges.get(0)[0], e = ranges.get(0)[1];
+ for (int i = 1; i < ranges.size(); i++) {
+ int ns = ranges.get(i)[0], ne = ranges.get(i)[1];
+ if (ns <= e + 1 + mergeGap) {
+ e = Math.max(e, ne);
+ } else {
+ out.add(new int[] {s, e});
+ s = ns;
+ e = ne;
+ }
+ }
+ out.add(new int[] {s, e});
+ return out;
}
@Override
public List<float[]> penetrate(String filePath, List<List<Integer>>
startAndEndTimeArray) {
- Dataset dataset = gdal.Open(filePath, gdalconstConstants.GA_ReadOnly);
- if (dataset == null) {
- LOGGER.error("Failed to open tiff file: {}", gdal.GetLastErrorMsg());
- throw new RuntimeException("Failed to open tiff file: " +
gdal.GetLastErrorMsg());
+ if (startAndEndTimeArray == null || startAndEndTimeArray.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ // 为保证返回顺序与输入一致:先把输入解析成条目列表(带原始 index)
+ class Req {
+ final int idx; // 原始顺序
+ final int startPix; // 像元索引(0..total-1)
+ final int endPix; // 像元索引(>=startPix)
+ int row, col0, col1; // 解析后的行号与列范围
+
+ Req(int idx, int s, int e) {
+ this.idx = idx;
+ this.startPix = Math.min(s, e);
+ this.endPix = Math.max(s, e);
+ }
}
+ List<Req> reqs = new ArrayList<>(startAndEndTimeArray.size());
+ for (int i = 0; i < startAndEndTimeArray.size(); i++) {
+ List<Integer> r = startAndEndTimeArray.get(i);
+ if (r == null || r.size() < 2) {
+ throw new IllegalArgumentException("Each range must be [start, end].");
+ }
+ reqs.add(new Req(i, r.get(0), r.get(1)));
+ }
+
+ Dataset ds = gdal.OpenShared(filePath, gdalconstConstants.GA_ReadOnly);
+ if (ds == null) {
+ throw new RuntimeException("Failed to open: " + gdal.GetLastErrorMsg());
+ }
+
try {
- Band band = dataset.GetRasterBand(1);
- if (band == null) {
- LOGGER.error("Failed to get raster band from dataset: {}",
gdal.GetLastErrorMsg());
- throw new RuntimeException(
- "Failed to get raster band from dataset" + gdal.GetLastErrorMsg());
+ Band band = ds.GetRasterBand(1);
+ final int width = band.getXSize();
+ final int height = band.getYSize();
+ final long total = (long) width * (long) height;
+
+ // 解析行/列 & 校验仅同一行
+ for (Req q : reqs) {
+ if (q.startPix < 0 || (long) q.endPix >= total) {
+ throw new IndexOutOfBoundsException(
+ String.format("Range [%d,%d] out of bounds [0,%d).", q.startPix,
q.endPix, total));
+ }
+ int sRow = q.startPix / width, sCol = q.startPix % width;
+ int eRow = q.endPix / width, eCol = q.endPix % width;
+ if (sRow != eRow) {
+ throw new IllegalArgumentException(
+ "Range crosses rows: [" + q.startPix + "," + q.endPix + "]");
+ }
+ q.row = sRow;
+ q.col0 = sCol;
+ q.col1 = eCol;
+ }
+
+ // NoData 配置
+ Double[] nd = new Double[1];
+ band.GetNoDataValue(nd);
+ final boolean needMapNoData = nd[0] != null && !Double.isNaN(nd[0]);
+ final float nodata = needMapNoData ? nd[0].floatValue() : Float.NaN;
+
+ // 行 -> (合并前的列区间列表)
+ Map<Integer, List<int[]>> perRow = new LinkedHashMap<>();
+ for (Req q : reqs) {
+ perRow.computeIfAbsent(q.row, k -> new ArrayList<>()).add(new int[]
{q.col0, q.col1});
+ }
+
+ // 为每个请求预先分配目标数组,最后按 idx 顺序收集
+ float[][] outputs = new float[reqs.size()][];
+ for (Req q : reqs) {
+ outputs[q.idx] = new float[q.col1 - q.col0 + 1];
+ }
+
+ // mergeGap 可按需要调大,如 4/8(允许读取少量“间隙像元”换更少的 ReadRaster 次数)
+ final int mergeGap = 0;
+
+ // 行内再建立“原始区间列表”(保持输入顺序),用于把窗口数据拆回去
+ Map<Integer, List<Req>> rowReqs = new LinkedHashMap<>();
+ for (Req q : reqs) rowReqs.computeIfAbsent(q.row, k -> new
ArrayList<>()).add(q);
+
+ // 逐行处理
+ for (Map.Entry<Integer, List<int[]>> e : perRow.entrySet()) {
+ int row = e.getKey();
+ List<int[]> ranges = e.getValue();
+
+ // 合并区间 -> 更少的读取窗口
+ List<int[]> merged = mergeRanges(ranges, mergeGap);
+
+ // 按起点排序,便于窗口覆盖
+ List<Req> rowList = rowReqs.get(row);
+ rowList.sort(Comparator.comparingInt(a -> a.col0));
+ int p = 0;
+
+ for (int[] win : merged) {
+ int c0 = win[0], c1 = win[1];
+ int winLen = c1 - c0 + 1;
+
+ float[] scratch = ensureCapacity(TL_SCRATCH.get(), winLen);
+ int err = band.ReadRaster(c0, row, winLen, 1,
gdalconstConstants.GDT_Float32, scratch);
+ if (err != gdalconstConstants.CE_None) {
+ throw new RuntimeException(
+ "ReadRaster(row="
+ + row
+ + ", "
+ + c0
+ + ":"
+ + c1
+ + ") failed: "
+ + gdal.GetLastErrorMsg());
+ }
+
+ // 把窗口分发给所有落在其中的原始区间
+ while (p < rowList.size()) {
+ Req q = rowList.get(p);
+ if (q.col1 < c0) {
+ p++;
+ continue;
+ } // 在窗口左侧,跳过
+ if (q.col0 > c1) {
+ break;
+ } // 窗口右侧,进入下一个窗口
+
+ int from = Math.max(q.col0, c0);
+ int to = Math.min(q.col1, c1);
+ int len = to - from + 1;
+
+ float[] dst = outputs[q.idx];
+ System.arraycopy(scratch, from - c0, dst, from - q.col0, len);
+
+ // 注意:若 mergeGap>0,极端情况下一个 req 可能跨两个合并窗口;
+ // 这里不 p++,而是仅当整个 req 覆盖完才前移指针
+ if (to == q.col1) {
+ p++;
+ } // 该 req 已经完全覆盖
+ else {
+ break;
+ } // 仍有剩余,等待下一窗口补齐
+ }
+ }
}
- int width = band.getXSize();
- List<float[]> result = new ArrayList<>();
- for (List<Integer> startAndEndTime : startAndEndTimeArray) {
- int xIndex = startAndEndTime.get(0);
- int yIndex = startAndEndTime.get(1);
- float[] tmp = new float[yIndex - xIndex + 1];
- int xOff = xIndex % width;
- int yOff = xIndex / width;
- int xSize = yIndex - xIndex + 1;
- int ySize = 1;
- band.ReadRaster(xOff, yOff, xSize, ySize,
gdalconstConstants.GDT_Float32, tmp);
- result.add(tmp);
+
+ // NoData -> NaN
+ if (needMapNoData) {
+ for (float[] arr : outputs) {
+ for (int i = 0; i < arr.length; i++) if (arr[i] == nodata) arr[i] =
Float.NaN;
+ }
}
+
+ // 按原始顺序返回
+ List<float[]> result = new ArrayList<>(outputs.length);
+ for (int i = 0; i < outputs.length; i++) result.add(outputs[i]);
return result;
} finally {
- dataset.delete();
+ ds.delete();
}
}
}