Copilot commented on code in PR #2663:
URL: https://github.com/apache/sedona/pull/2663#discussion_r2826878543


##########
common/src/main/java/org/apache/sedona/common/raster/cog/CogWriter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.common.raster.cog;
+
+import java.awt.image.RenderedImage;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.imageio.ImageWriteParam;
+import javax.media.jai.Interpolation;
+import javax.media.jai.InterpolationBicubic;
+import javax.media.jai.InterpolationBilinear;
+import javax.media.jai.InterpolationNearest;
+import org.geotools.api.coverage.grid.GridCoverageWriter;
+import org.geotools.api.parameter.GeneralParameterValue;
+import org.geotools.api.parameter.ParameterValueGroup;
+import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
+import org.geotools.api.referencing.datum.PixelInCell;
+import org.geotools.coverage.grid.GridCoverage2D;
+import org.geotools.coverage.grid.GridEnvelope2D;
+import org.geotools.coverage.grid.GridGeometry2D;
+import org.geotools.coverage.grid.io.AbstractGridFormat;
+import org.geotools.coverage.processing.Operations;
+import org.geotools.gce.geotiff.GeoTiffWriteParams;
+import org.geotools.gce.geotiff.GeoTiffWriter;
+import org.geotools.referencing.operation.transform.AffineTransform2D;
+
+/**
+ * Creates Cloud Optimized GeoTIFF (COG) files from GeoTools GridCoverage2D 
rasters.
+ *
+ * <p>The COG generation process:
+ *
+ * <ol>
+ *   <li>Compute overview decimation factors (power of 2: 2, 4, 8, ...)
+ *   <li>Generate overview images by downsampling
+ *   <li>Write each (full-res + overviews) as a separate tiled GeoTIFF via 
GeoTools
+ *   <li>Parse each TIFF's IFD structure
+ *   <li>Reassemble into COG byte order using {@link CogAssembler}
+ * </ol>
+ *
+ * <p>Overview decimation algorithm ported from GeoTrellis's {@code
+ * GeoTiff.defaultOverviewDecimations}.
+ */
+public class CogWriter {
+
+  /** Default tile size for COG output, matching GDAL's default */
+  public static final int DEFAULT_TILE_SIZE = 256;
+
+  /** Minimum image dimension to create an overview for */
+  private static final int MIN_OVERVIEW_SIZE = 2;
+
+  /**
+   * Write a GridCoverage2D as a Cloud Optimized GeoTIFF byte array using the 
given options.
+   *
+   * @param raster The input raster
+   * @param options COG generation options (compression, tileSize, resampling, 
overviewCount)
+   * @return COG file as byte array
+   * @throws IOException if writing fails
+   */
+  public static byte[] write(GridCoverage2D raster, CogOptions options) throws 
IOException {
+    String compressionType = options.getCompression();
+    double compressionQuality = options.getCompressionQuality();
+    int tileSize = options.getTileSize();
+    String resampling = options.getResampling();
+    int requestedOverviewCount = options.getOverviewCount();
+
+    RenderedImage image = raster.getRenderedImage();
+    int cols = image.getWidth();
+    int rows = image.getHeight();
+
+    // Step 1: Compute overview decimation factors
+    List<Integer> decimations;
+    if (requestedOverviewCount == 0) {
+      decimations = new ArrayList<>();
+    } else {
+      decimations = computeOverviewDecimations(cols, rows, tileSize);
+      if (requestedOverviewCount > 0 && requestedOverviewCount < 
decimations.size()) {
+        decimations = decimations.subList(0, requestedOverviewCount);
+      }
+    }
+
+    // Step 2: Generate overview coverages
+    Interpolation interpolation = getInterpolation(resampling);
+    List<GridCoverage2D> overviews = new ArrayList<>();
+    for (int decimation : decimations) {
+      GridCoverage2D overview = generateOverview(raster, decimation, 
interpolation);
+      overviews.add(overview);
+    }
+
+    // Step 3: Write each as a tiled GeoTIFF byte array
+    List<byte[]> tiffBytes = new ArrayList<>();
+    tiffBytes.add(writeAsTiledGeoTiff(raster, compressionType, 
compressionQuality, tileSize));
+    for (GridCoverage2D overview : overviews) {
+      tiffBytes.add(writeAsTiledGeoTiff(overview, compressionType, 
compressionQuality, tileSize));
+    }
+
+    // Step 4: Parse each TIFF's IFD structure
+    List<TiffIfdParser.ParsedTiff> parsedTiffs = new ArrayList<>();
+    for (byte[] bytes : tiffBytes) {
+      parsedTiffs.add(TiffIfdParser.parse(bytes));
+    }
+
+    // Step 5: Reassemble into COG byte order
+    return CogAssembler.assemble(parsedTiffs);
+  }
+
+  /**
+   * Write a GridCoverage2D as a Cloud Optimized GeoTIFF byte array.
+   *
+   * @param raster The input raster
+   * @param compressionType Compression type: "Deflate", "LZW", "JPEG", 
"PackBits", or null for
+   *     default (Deflate)
+   * @param compressionQuality Quality 0.0 (max compression) to 1.0 (no 
compression), or -1 for
+   *     default
+   * @param tileSize Tile width and height in pixels
+   * @return COG file as byte array
+   * @throws IOException if writing fails
+   */
+  public static byte[] write(
+      GridCoverage2D raster, String compressionType, double 
compressionQuality, int tileSize)
+      throws IOException {
+
+    CogOptions.Builder builder = CogOptions.builder().tileSize(tileSize);
+    if (compressionType != null) {
+      builder.compression(compressionType);
+    }
+    if (compressionQuality >= 0) {
+      builder.compressionQuality(compressionQuality);
+    }
+    return write(raster, builder.build());
+  }
+
+  /**
+   * Write a GridCoverage2D as COG with default settings (Deflate compression, 
256x256 tiles).
+   *
+   * @param raster The input raster
+   * @return COG file as byte array
+   * @throws IOException if writing fails
+   */
+  public static byte[] write(GridCoverage2D raster) throws IOException {
+    return write(raster, "Deflate", 0.2, DEFAULT_TILE_SIZE);
+  }
+
+  /**
+   * Write a GridCoverage2D as COG with specified compression.
+   *
+   * @param raster The input raster
+   * @param compressionType Compression type
+   * @param compressionQuality Quality 0.0 to 1.0
+   * @return COG file as byte array
+   * @throws IOException if writing fails
+   */
+  public static byte[] write(
+      GridCoverage2D raster, String compressionType, double 
compressionQuality) throws IOException {
+    return write(raster, compressionType, compressionQuality, 
DEFAULT_TILE_SIZE);
+  }
+
+  /**
+   * Compute overview decimation factors. Each level is a power of 2.
+   *
+   * <p>Ported from GeoTrellis: {@code GeoTiff.defaultOverviewDecimations()}
+   *
+   * @param cols Image width in pixels
+   * @param rows Image height in pixels
+   * @param blockSize Tile size for the overview
+   * @return List of decimation factors [2, 4, 8, ...] or empty if image is 
too small
+   */
+  static List<Integer> computeOverviewDecimations(int cols, int rows, int 
blockSize) {
+    List<Integer> decimations = new ArrayList<>();
+    double pixels = Math.max(cols, rows);
+    double blocks = pixels / blockSize;
+    int overviewLevels = (int) Math.ceil(Math.log(blocks) / Math.log(2));
+
+    for (int level = 0; level < overviewLevels; level++) {
+      int decimation = (int) Math.pow(2, level + 1);
+      int overviewCols = (int) Math.ceil((double) cols / decimation);
+      int overviewRows = (int) Math.ceil((double) rows / decimation);
+      if (overviewCols < MIN_OVERVIEW_SIZE || overviewRows < 
MIN_OVERVIEW_SIZE) {
+        break;
+      }
+      decimations.add(decimation);
+    }
+    return decimations;
+  }
+
+  /**
+   * Generate an overview (reduced resolution) coverage by downsampling.
+   *
+   * @param raster The full resolution raster
+   * @param decimationFactor Factor to reduce by (2 = half size, 4 = quarter, 
etc.)
+   * @param interpolation The interpolation method to use for resampling
+   * @return A new GridCoverage2D at reduced resolution
+   */
+  static GridCoverage2D generateOverview(
+      GridCoverage2D raster, int decimationFactor, Interpolation 
interpolation) {
+    RenderedImage image = raster.getRenderedImage();
+    int newWidth = (int) Math.ceil((double) image.getWidth() / 
decimationFactor);
+    int newHeight = (int) Math.ceil((double) image.getHeight() / 
decimationFactor);
+
+    // Use GeoTools Operations.DEFAULT.resample to downsample
+    CoordinateReferenceSystem crs = raster.getCoordinateReferenceSystem2D();
+
+    AffineTransform2D originalTransform =
+        (AffineTransform2D) raster.getGridGeometry().getGridToCRS2D();
+    double newScaleX = originalTransform.getScaleX() * decimationFactor;
+    double newScaleY = originalTransform.getScaleY() * decimationFactor;
+
+    AffineTransform2D newTransform =
+        new AffineTransform2D(
+            newScaleX,
+            originalTransform.getShearY(),
+            originalTransform.getShearX(),
+            newScaleY,
+            originalTransform.getTranslateX(),
+            originalTransform.getTranslateY());
+
+    GridGeometry2D gridGeometry =
+        new GridGeometry2D(
+            new GridEnvelope2D(0, 0, newWidth, newHeight),
+            PixelInCell.CELL_CORNER,
+            newTransform,
+            crs,
+            null);

Review Comment:
   `generateOverview` hard-codes `PixelInCell.CELL_CORNER` when constructing 
the target `GridGeometry2D`. Sedona can create rasters with 
`PixelInCell.CELL_CENTER` (e.g., NetCDF reader / some constructors), so this 
can shift the georeferencing of overviews by half a pixel. Preserve the input 
raster’s pixel anchor when building the overview grid geometry (or derive the 
target transform via the existing GridGeometry APIs rather than forcing 
CELL_CORNER).



##########
common/src/test/java/org/apache/sedona/common/raster/cog/CogWriterTest.java:
##########
@@ -0,0 +1,651 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.common.raster.cog;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import org.apache.sedona.common.raster.MapAlgebra;
+import org.apache.sedona.common.raster.RasterConstructors;
+import org.apache.sedona.common.raster.RasterOutputs;
+import org.geotools.coverage.grid.GridCoverage2D;
+import org.junit.Test;
+
+public class CogWriterTest {
+
+  private static final String resourceFolder =
+      System.getProperty("user.dir") + "/../spark/common/src/test/resources/";
+
+  private GridCoverage2D rasterFromGeoTiff(String filePath) throws IOException 
{
+    byte[] bytes = Files.readAllBytes(Paths.get(filePath));
+    return RasterConstructors.fromGeoTiff(bytes);
+  }
+
+  @Test
+  public void testComputeOverviewDecimations() {
+    // 1000x1000 with blockSize=256: ceil(log2(1000/256)) = ceil(1.97) = 2 
levels -> [2, 4]
+    List<Integer> decimations = CogWriter.computeOverviewDecimations(1000, 
1000, 256);
+    assertEquals(2, decimations.size());
+    assertEquals(Integer.valueOf(2), decimations.get(0));
+    assertEquals(Integer.valueOf(4), decimations.get(1));
+
+    // 10000x10000 with blockSize=256: ceil(log2(10000/256)) = ceil(5.29) = 6 
levels
+    decimations = CogWriter.computeOverviewDecimations(10000, 10000, 256);
+    assertEquals(6, decimations.size());
+    assertEquals(Integer.valueOf(2), decimations.get(0));
+    assertEquals(Integer.valueOf(4), decimations.get(1));
+    assertEquals(Integer.valueOf(8), decimations.get(2));
+    assertEquals(Integer.valueOf(16), decimations.get(3));
+    assertEquals(Integer.valueOf(32), decimations.get(4));
+    assertEquals(Integer.valueOf(64), decimations.get(5));
+
+    // Very small image: 50x50 with blockSize=256 -> no overviews
+    decimations = CogWriter.computeOverviewDecimations(50, 50, 256);
+    assertEquals(0, decimations.size());
+
+    // Exactly one tile: 256x256 with blockSize=256 -> no overviews
+    decimations = CogWriter.computeOverviewDecimations(256, 256, 256);
+    assertEquals(0, decimations.size());
+  }
+
+  @Test
+  public void testGenerateOverview() {
+    // Create a 100x100 single-band raster
+    double[] bandValues = new double[100 * 100];
+    for (int i = 0; i < bandValues.length; i++) {
+      bandValues[i] = i % 256;
+    }
+    GridCoverage2D raster =
+        RasterConstructors.makeNonEmptyRaster(
+            1, "d", 100, 100, 0, 0, 1, -1, 0, 0, 4326, new double[][] 
{bandValues});
+
+    // Downsample by factor of 2
+    GridCoverage2D overview = CogWriter.generateOverview(raster, 2);
+    assertNotNull(overview);
+    assertEquals(50, overview.getRenderedImage().getWidth());
+    assertEquals(50, overview.getRenderedImage().getHeight());
+  }
+
+  @Test
+  public void testWriteSmallRasterAsCog() throws IOException {
+    // Create a small raster (no overviews expected due to small size)
+    double[] bandValues = new double[50 * 50];
+    for (int i = 0; i < bandValues.length; i++) {
+      bandValues[i] = i % 256;
+    }
+    GridCoverage2D raster =
+        RasterConstructors.makeNonEmptyRaster(
+            1, "d", 50, 50, 0, 0, 1, -1, 0, 0, 4326, new double[][] 
{bandValues});
+
+    byte[] cogBytes = RasterOutputs.asCloudOptimizedGeoTiff(raster);
+    assertNotNull(cogBytes);
+    assertTrue(cogBytes.length > 0);
+
+    // Verify it's a valid TIFF
+    assertTrue(
+        (cogBytes[0] == 'I' && cogBytes[1] == 'I') || (cogBytes[0] == 'M' && 
cogBytes[1] == 'M'));
+
+    // Verify it can be read back
+    GridCoverage2D readBack = RasterConstructors.fromGeoTiff(cogBytes);
+    assertNotNull(readBack);
+    assertEquals(50, readBack.getRenderedImage().getWidth());
+    assertEquals(50, readBack.getRenderedImage().getHeight());
+  }
+
+  @Test
+  public void testWriteMediumRasterAsCog() throws IOException {
+    // Create a 512x512 raster (should produce overviews with 256 tile size)
+    double[] bandValues = new double[512 * 512];
+    for (int i = 0; i < bandValues.length; i++) {
+      bandValues[i] = (i * 7) % 256;
+    }
+    GridCoverage2D raster =
+        RasterConstructors.makeNonEmptyRaster(
+            1, "d", 512, 512, 0, 0, 1, -1, 0, 0, 4326, new double[][] 
{bandValues});
+
+    byte[] cogBytes = RasterOutputs.asCloudOptimizedGeoTiff(raster, "Deflate", 
0.5);
+    assertNotNull(cogBytes);
+    assertTrue(cogBytes.length > 0);
+
+    // Verify COG structure: IFDs should be at the beginning of the file
+    ByteOrder byteOrder = (cogBytes[0] == 'I') ? ByteOrder.LITTLE_ENDIAN : 
ByteOrder.BIG_ENDIAN;
+    ByteBuffer buf = ByteBuffer.wrap(cogBytes).order(byteOrder);
+
+    // First IFD should be at offset 8 (right after header)
+    int firstIfdOffset = buf.getInt(4);
+    assertEquals(8, firstIfdOffset);
+
+    // Read first IFD tag count
+    int tagCount = buf.getShort(firstIfdOffset) & 0xFFFF;
+    assertTrue("First IFD should have tags", tagCount > 0);
+
+    // Check that nextIFDOffset points to another IFD (should have at least 1 
overview)
+    int nextIfdPointerPos = firstIfdOffset + 2 + tagCount * 12;
+    int nextIfdOffset = buf.getInt(nextIfdPointerPos);
+    // For a 512x512 image with 256 tile size, we expect at least one overview
+    assertTrue("Should have at least one overview IFD", nextIfdOffset > 0);
+    // The next IFD should be before any image data (COG requirement)
+    assertTrue(
+        "Overview IFD should immediately follow first IFD region",
+        nextIfdOffset < cogBytes.length / 2);
+
+    // Verify it can be read back by GeoTools
+    GridCoverage2D readBack = RasterConstructors.fromGeoTiff(cogBytes);
+    assertNotNull(readBack);
+    assertEquals(512, readBack.getRenderedImage().getWidth());
+    assertEquals(512, readBack.getRenderedImage().getHeight());
+
+    // Verify pixel values are preserved
+    double[] originalValues = MapAlgebra.bandAsArray(raster, 1);
+    double[] readBackValues = MapAlgebra.bandAsArray(readBack, 1);
+    assertArrayEquals(originalValues, readBackValues, 0.01);
+  }
+
+  @Test
+  public void testWriteMultibandRasterAsCog() throws IOException {
+    // Create a 3-band 256x256 raster
+    int width = 256;
+    int height = 256;
+    int numBands = 3;
+    double[][] bandData = new double[numBands][width * height];
+    for (int b = 0; b < numBands; b++) {
+      for (int i = 0; i < width * height; i++) {
+        bandData[b][i] = (i * (b + 1)) % 256;
+      }
+    }
+
+    GridCoverage2D raster =
+        RasterConstructors.makeNonEmptyRaster(
+            numBands, "b", width, height, 0, 0, 1, -1, 0, 0, 4326, bandData);
+
+    byte[] cogBytes = RasterOutputs.asCloudOptimizedGeoTiff(raster);
+    assertNotNull(cogBytes);
+
+    // Verify it can be read back
+    GridCoverage2D readBack = RasterConstructors.fromGeoTiff(cogBytes);
+    assertNotNull(readBack);
+    assertEquals(width, readBack.getRenderedImage().getWidth());
+    assertEquals(height, readBack.getRenderedImage().getHeight());
+  }
+
+  @Test
+  public void testWriteWithLZWCompression() throws IOException {
+    double[] bandValues = new double[100 * 100];
+    for (int i = 0; i < bandValues.length; i++) {
+      bandValues[i] = i % 10; // Highly compressible
+    }
+    GridCoverage2D raster =
+        RasterConstructors.makeNonEmptyRaster(
+            1, "d", 100, 100, 0, 0, 1, -1, 0, 0, 4326, new double[][] 
{bandValues});
+
+    byte[] cogBytes = RasterOutputs.asCloudOptimizedGeoTiff(raster, "LZW", 
0.5);
+    assertNotNull(cogBytes);
+    assertTrue(cogBytes.length > 0);
+
+    GridCoverage2D readBack = RasterConstructors.fromGeoTiff(cogBytes);
+    assertNotNull(readBack);
+  }
+
+  @Test
+  public void testCogFromExistingGeoTiff() throws IOException {
+    // Test with a real GeoTIFF file from test resources
+    GridCoverage2D raster = rasterFromGeoTiff(resourceFolder + 
"raster/test1.tiff");
+
+    byte[] cogBytes = RasterOutputs.asCloudOptimizedGeoTiff(raster);
+    assertNotNull(cogBytes);
+    assertTrue(cogBytes.length > 0);
+
+    // Verify it can be read back
+    GridCoverage2D readBack = RasterConstructors.fromGeoTiff(cogBytes);
+    assertNotNull(readBack);
+    assertEquals(raster.getRenderedImage().getWidth(), 
readBack.getRenderedImage().getWidth());
+    assertEquals(raster.getRenderedImage().getHeight(), 
readBack.getRenderedImage().getHeight());
+  }
+
+  @Test
+  public void testTiffIfdParser() throws IOException {
+    // Write a tiled GeoTIFF and parse it
+    double[] bandValues = new double[256 * 256];
+    for (int i = 0; i < bandValues.length; i++) {
+      bandValues[i] = i % 256;
+    }
+    GridCoverage2D raster =
+        RasterConstructors.makeNonEmptyRaster(
+            1, "d", 256, 256, 0, 0, 1, -1, 0, 0, 4326, new double[][] 
{bandValues});
+
+    byte[] tiffBytes = RasterOutputs.asGeoTiff(raster, "Deflate", 0.5);
+
+    TiffIfdParser.ParsedTiff parsed = TiffIfdParser.parse(tiffBytes);
+    assertNotNull(parsed);
+    assertTrue(parsed.tagCount > 0);
+    assertTrue(parsed.imageData.length > 0);
+    assertTrue(parsed.ifdEntries.length == parsed.tagCount * 12);
+  }
+
+  @Test
+  public void testOverviewIfdHasNewSubfileType() throws IOException {
+    // Create a 512x512 raster that will have at least one overview
+    double[] bandValues = new double[512 * 512];
+    for (int i = 0; i < bandValues.length; i++) {
+      bandValues[i] = (i * 3) % 256;
+    }
+    GridCoverage2D raster =
+        RasterConstructors.makeNonEmptyRaster(
+            1, "d", 512, 512, 0, 0, 1, -1, 0, 0, 4326, new double[][] 
{bandValues});
+
+    byte[] cogBytes = RasterOutputs.asCloudOptimizedGeoTiff(raster);
+    ByteOrder byteOrder = (cogBytes[0] == 'I') ? ByteOrder.LITTLE_ENDIAN : 
ByteOrder.BIG_ENDIAN;
+    ByteBuffer buf = ByteBuffer.wrap(cogBytes).order(byteOrder);
+
+    // Navigate to second IFD (first overview)
+    int firstIfdOffset = buf.getInt(4);
+    int tagCount0 = buf.getShort(firstIfdOffset) & 0xFFFF;
+    int nextIfdPointerPos = firstIfdOffset + 2 + tagCount0 * 12;
+    int secondIfdOffset = buf.getInt(nextIfdPointerPos);
+    assertTrue("Should have at least one overview IFD", secondIfdOffset > 0);
+
+    // Scan second IFD for NewSubfileType tag (254)
+    int tagCount1 = buf.getShort(secondIfdOffset) & 0xFFFF;
+    boolean foundNewSubfileType = false;
+    int newSubfileTypeValue = -1;
+    for (int i = 0; i < tagCount1; i++) {
+      int entryOffset = secondIfdOffset + 2 + i * 12;
+      int tag = buf.getShort(entryOffset) & 0xFFFF;
+      if (tag == 254) {
+        foundNewSubfileType = true;
+        newSubfileTypeValue = buf.getInt(entryOffset + 8);
+        break;
+      }
+    }
+    assertTrue("Overview IFD must contain NewSubfileType tag (254)", 
foundNewSubfileType);
+    assertEquals("NewSubfileType must be 1 (ReducedImage)", 1, 
newSubfileTypeValue);
+  }
+
+  @Test
+  public void testInvalidInputParameters() {
+    double[] bandValues = new double[50 * 50];
+    GridCoverage2D raster =
+        RasterConstructors.makeNonEmptyRaster(
+            1, "d", 50, 50, 0, 0, 1, -1, 0, 0, 4326, new double[][] 
{bandValues});
+
+    // compressionQuality > 1
+    try {
+      CogWriter.write(raster, "Deflate", 1.5, 256);
+      fail("Expected IllegalArgumentException for compressionQuality > 1");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("compressionQuality"));
+    } catch (IOException e) {
+      fail("Expected IllegalArgumentException, not IOException");
+    }
+
+    // tileSize <= 0
+    try {
+      CogWriter.write(raster, "Deflate", 0.5, 0);
+      fail("Expected IllegalArgumentException for tileSize <= 0");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("tileSize"));
+    } catch (IOException e) {
+      fail("Expected IllegalArgumentException, not IOException");
+    }
+
+    // tileSize not power of 2
+    try {
+      CogWriter.write(raster, "Deflate", 0.5, 100);
+      fail("Expected IllegalArgumentException for non-power-of-2 tileSize");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("power of 2"));
+    } catch (IOException e) {
+      fail("Expected IllegalArgumentException, not IOException");
+    }
+  }
+
+  @Test
+  public void testParserRejectsMalformedTiff() {
+    // Too short
+    try {
+      TiffIfdParser.parse(new byte[] {0, 0, 0});
+      fail("Expected IllegalArgumentException for short input");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("too short"));
+    }
+
+    // Invalid byte order marker
+    try {
+      TiffIfdParser.parse(new byte[] {'X', 'X', 0, 42, 0, 0, 0, 8});
+      fail("Expected IllegalArgumentException for invalid byte order");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("byte order"));
+    }
+
+    // Valid header but IFD offset points beyond file
+    byte[] badOffset = new byte[16];
+    badOffset[0] = 'I';
+    badOffset[1] = 'I';
+    badOffset[2] = 42;
+    badOffset[3] = 0;
+    // IFD offset = 9999 (way beyond file)
+    ByteBuffer b = ByteBuffer.wrap(badOffset).order(ByteOrder.LITTLE_ENDIAN);
+    b.putInt(4, 9999);
+    try {
+      TiffIfdParser.parse(badOffset);
+      fail("Expected IllegalArgumentException for out-of-range IFD offset");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("out of range"));
+    }
+  }
+
+  @Test
+  public void testCogTileOffsetsAreForwardPointing() throws IOException {
+    // Create a raster with overviews
+    double[] bandValues = new double[512 * 512];
+    for (int i = 0; i < bandValues.length; i++) {
+      bandValues[i] = (i * 11) % 256;
+    }
+    GridCoverage2D raster =
+        RasterConstructors.makeNonEmptyRaster(
+            1, "d", 512, 512, 0, 0, 1, -1, 0, 0, 4326, new double[][] 
{bandValues});
+
+    byte[] cogBytes = RasterOutputs.asCloudOptimizedGeoTiff(raster);
+    ByteOrder byteOrder = (cogBytes[0] == 'I') ? ByteOrder.LITTLE_ENDIAN : 
ByteOrder.BIG_ENDIAN;
+    ByteBuffer buf = ByteBuffer.wrap(cogBytes).order(byteOrder);
+
+    // Walk all IFDs and verify TileOffsets/StripOffsets point within file 
bounds
+    int ifdOffset = buf.getInt(4);
+    int ifdIndex = 0;
+    int lastIfdEnd = 0;
+
+    while (ifdOffset != 0) {
+      int tagCount = buf.getShort(ifdOffset) & 0xFFFF;
+      int ifdEnd = ifdOffset + 2 + tagCount * 12 + 4;
+      lastIfdEnd = Math.max(lastIfdEnd, ifdEnd);
+
+      for (int i = 0; i < tagCount; i++) {
+        int entryOffset = ifdOffset + 2 + i * 12;
+        int tag = buf.getShort(entryOffset) & 0xFFFF;
+        int fieldType = buf.getShort(entryOffset + 2) & 0xFFFF;
+        int count = buf.getInt(entryOffset + 4);
+
+        // Check TileOffsets (324) or StripOffsets (273)
+        if (tag == 324 || tag == 273) {
+          if (count == 1) {
+            int offset = buf.getInt(entryOffset + 8);
+            assertTrue(
+                "IFD " + ifdIndex + ": TileOffset " + offset + " must be 
within file",
+                offset >= 0 && offset < cogBytes.length);
+          } else {
+            // Offsets stored in overflow area
+            int arrayOffset = buf.getInt(entryOffset + 8);
+            for (int j = 0; j < count; j++) {
+              int tileOffset = buf.getInt(arrayOffset + j * 4);
+              assertTrue(
+                  "IFD " + ifdIndex + " tile " + j + ": offset " + tileOffset 
+ " out of range",
+                  tileOffset >= 0 && tileOffset < cogBytes.length);
+            }
+          }
+        }
+      }
+
+      // Read next IFD offset
+      int nextIfdPointerPos = ifdOffset + 2 + tagCount * 12;
+      ifdOffset = buf.getInt(nextIfdPointerPos);
+      ifdIndex++;
+    }
+
+    // Verify we found at least 2 IFDs (full-res + overview)
+    assertTrue("Expected at least 2 IFDs, found " + ifdIndex, ifdIndex >= 2);
+
+    // Verify all IFDs are before image data (forward-pointing)
+    // The last IFD end should be well before the end of the file
+    assertTrue("IFD region should be at start of file", lastIfdEnd < 
cogBytes.length / 2);
+  }

Review Comment:
   This test’s “IFD region should be at start of file” assertion uses 
`lastIfdEnd < cogBytes.length / 2`, which can be flaky for highly compressible 
inputs (image data can be smaller than metadata/IFDs). Consider computing the 
minimum TileOffsets/StripOffsets across all IFDs and asserting `lastIfdEnd <= 
minOffset` instead of comparing to half the file length.



##########
common/src/main/java/org/apache/sedona/common/raster/cog/TiffIfdParser.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.common.raster.cog;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * Parses the IFD (Image File Directory) structure from a TIFF byte array. 
This is used to extract
+ * the structural components needed for COG assembly: the IFD entries, 
overflow tag data, and image
+ * data regions.
+ *
+ * <p>Reference: TIFF 6.0 Specification, Section 2 (TIFF Structure).
+ */
+public class TiffIfdParser {
+
+  /** Tag code for TileOffsets (0x0144 = 324) */
+  public static final int TAG_TILE_OFFSETS = 324;
+
+  /** Tag code for StripOffsets (0x0111 = 273) */
+  public static final int TAG_STRIP_OFFSETS = 273;
+
+  /** Tag code for TileByteCounts (0x0145 = 325) */
+  public static final int TAG_TILE_BYTE_COUNTS = 325;
+
+  /** Tag code for StripByteCounts (0x0117 = 279) */
+  public static final int TAG_STRIP_BYTE_COUNTS = 279;
+
+  /** Tag code for NewSubfileType (0x00FE = 254) */
+  public static final int TAG_NEW_SUBFILE_TYPE = 254;
+
+  /** TIFF field type sizes in bytes */
+  private static final int[] FIELD_TYPE_SIZES = {
+    0, // 0: unused
+    1, // 1: BYTE
+    1, // 2: ASCII
+    2, // 3: SHORT
+    4, // 4: LONG
+    8, // 5: RATIONAL
+    1, // 6: SBYTE
+    1, // 7: UNDEFINED
+    2, // 8: SSHORT
+    4, // 9: SLONG
+    8, // 10: SRATIONAL
+    4, // 11: FLOAT
+    8 // 12: DOUBLE
+  };
+
+  /**
+   * Result of parsing a TIFF file. Contains the byte order and the parsed IFD 
data for the first
+   * IFD only (we write each overview as a separate TIFF, so there's always 
exactly one IFD).
+   */
+  public static class ParsedTiff {
+    /** Byte order of the TIFF file */
+    public final ByteOrder byteOrder;
+
+    /** Offset where the first IFD starts (always 8 for standard TIFF) */
+    public final int ifdOffset;
+
+    /** Number of tag entries in the IFD */
+    public final int tagCount;
+
+    /**
+     * Raw bytes of all IFD tag entries (tagCount * 12 bytes). This includes 
the 12-byte entries but
+     * NOT the 2-byte tag count or the 4-byte next-IFD pointer.
+     */
+    public final byte[] ifdEntries;
+
+    /**
+     * Overflow tag data — values that exceed 4 bytes and are stored outside 
the IFD entries. This
+     * is collected in the order the tags reference them.
+     */
+    public final byte[] overflowData;
+
+    /**
+     * The start offset of the overflow data region in the original TIFF file. 
Used to rebase
+     * overflow pointers when reassembling.
+     */
+    public final int overflowDataStart;
+
+    /** The raw image data (all tiles/strips concatenated) */
+    public final byte[] imageData;
+
+    /** Byte offsets of each tile/strip within imageData (relative to 
imageData start) */
+    public final int[] segmentOffsets;
+
+    /** Byte counts of each tile/strip */
+    public final int[] segmentByteCounts;
+
+    /** Whether the IFD contains a NewSubfileType tag */
+    public final boolean hasNewSubfileType;
+
+    /** The total size of the IFD region: 2 (count) + tagCount*12 + 4 (next 
pointer) */
+    public int getIfdSize() {
+      return 2 + tagCount * 12 + 4;
+    }
+
+    /** The total size of IFD + overflow data (everything before image data) */
+    public int getIfdAndOverflowSize() {
+      return getIfdSize() + overflowData.length;
+    }
+
+    ParsedTiff(
+        ByteOrder byteOrder,
+        int ifdOffset,
+        int tagCount,
+        byte[] ifdEntries,
+        byte[] overflowData,
+        int overflowDataStart,
+        byte[] imageData,
+        int[] segmentOffsets,
+        int[] segmentByteCounts,
+        boolean hasNewSubfileType) {
+      this.byteOrder = byteOrder;
+      this.ifdOffset = ifdOffset;
+      this.tagCount = tagCount;
+      this.ifdEntries = ifdEntries;
+      this.overflowData = overflowData;
+      this.overflowDataStart = overflowDataStart;
+      this.imageData = imageData;
+      this.segmentOffsets = segmentOffsets;
+      this.segmentByteCounts = segmentByteCounts;
+      this.hasNewSubfileType = hasNewSubfileType;
+    }
+  }
+
+  /**
+   * Parse a standard TIFF byte array and extract its first IFD structure.
+   *
+   * @param tiffBytes The complete TIFF file as a byte array
+   * @return ParsedTiff with all structural components extracted
+   * @throws IllegalArgumentException if the TIFF header is invalid
+   */
+  public static ParsedTiff parse(byte[] tiffBytes) {
+    if (tiffBytes.length < 8) {
+      throw new IllegalArgumentException("TIFF data too short: " + 
tiffBytes.length + " bytes");
+    }
+
+    // Read byte order from first 2 bytes
+    ByteOrder byteOrder;
+    if (tiffBytes[0] == 'I' && tiffBytes[1] == 'I') {
+      byteOrder = ByteOrder.LITTLE_ENDIAN;
+    } else if (tiffBytes[0] == 'M' && tiffBytes[1] == 'M') {
+      byteOrder = ByteOrder.BIG_ENDIAN;
+    } else {
+      throw new IllegalArgumentException(
+          "Invalid TIFF byte order marker: " + tiffBytes[0] + ", " + 
tiffBytes[1]);
+    }
+
+    ByteBuffer buf = ByteBuffer.wrap(tiffBytes).order(byteOrder);
+
+    // Verify TIFF magic number (42)
+    int magic = buf.getShort(2) & 0xFFFF;
+    if (magic != 42) {
+      throw new IllegalArgumentException("Not a standard TIFF file (magic=" + 
magic + ")");
+    }
+
+    // Read first IFD offset
+    int ifdOffset = buf.getInt(4);
+    if (ifdOffset < 8 || ifdOffset >= tiffBytes.length - 2) {
+      throw new IllegalArgumentException(
+          "IFD offset out of range: " + ifdOffset + " (file size: " + 
tiffBytes.length + ")");
+    }
+
+    // Read number of directory entries
+    int tagCount = buf.getShort(ifdOffset) & 0xFFFF;
+
+    // Read all IFD entries (12 bytes each)
+    int entriesStart = ifdOffset + 2;
+    int entriesLen = tagCount * 12;
+    if (entriesStart + entriesLen > tiffBytes.length) {
+      throw new IllegalArgumentException(
+          "IFD entries extend beyond file: entriesStart="
+              + entriesStart
+              + " entriesLen="
+              + entriesLen
+              + " fileSize="
+              + tiffBytes.length);
+    }
+    byte[] ifdEntries = new byte[entriesLen];
+    System.arraycopy(tiffBytes, entriesStart, ifdEntries, 0, entriesLen);
+
+    // Find the offsets tag and bytecounts tag to locate image data
+    int offsetsTag = -1;
+    int byteCountsTag = -1;
+    int segmentCount = 0;
+    boolean hasNewSubfileType = false;
+
+    // Also track the overflow data region
+    int overflowStart = Integer.MAX_VALUE;
+    int overflowEnd = 0;
+
+    // First pass: find offset/bytecount tags and overflow region
+    for (int i = 0; i < tagCount; i++) {
+      int entryOffset = entriesStart + i * 12;
+      int tag = buf.getShort(entryOffset) & 0xFFFF;
+      int fieldType = buf.getShort(entryOffset + 2) & 0xFFFF;
+      int count = buf.getInt(entryOffset + 4);
+      int valueSize = count * getFieldTypeSize(fieldType);
+
+      if (tag == TAG_TILE_OFFSETS || tag == TAG_STRIP_OFFSETS) {
+        offsetsTag = tag;
+        segmentCount = count;
+      } else if (tag == TAG_TILE_BYTE_COUNTS || tag == TAG_STRIP_BYTE_COUNTS) {
+        byteCountsTag = tag;
+      } else if (tag == TAG_NEW_SUBFILE_TYPE) {
+        hasNewSubfileType = true;
+      }
+
+      // Track overflow data region (values > 4 bytes stored outside IFD 
entries)
+      if (valueSize > 4) {
+        int valOffset = buf.getInt(entryOffset + 8);
+        if (valOffset < 0 || valOffset + valueSize > tiffBytes.length) {
+          throw new IllegalArgumentException(
+              "Overflow data for tag "
+                  + tag
+                  + " out of range: offset="
+                  + valOffset
+                  + " size="
+                  + valueSize
+                  + " fileSize="
+                  + tiffBytes.length);
+        }
+        overflowStart = Math.min(overflowStart, valOffset);
+        overflowEnd = Math.max(overflowEnd, valOffset + valueSize);
+      }

Review Comment:
   `TiffIfdParser.parse` treats the TIFF `count` field as a signed int and 
computes `valueSize` as `count * typeSize` in an int. For large/unsigned counts 
this can overflow to a negative `valueSize`, which can bypass the range checks 
and lead to unexpected `IndexOutOfBoundsException`/DoS instead of a controlled 
`IllegalArgumentException`. Use unsigned/long arithmetic for 
`count`/`valueSize` and explicitly reject counts that are negative or would 
exceed the input length.



##########
common/src/main/java/org/apache/sedona/common/raster/cog/CogOptions.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.common.raster.cog;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * Options for Cloud Optimized GeoTIFF (COG) generation.
+ *
+ * <p>Use the {@link Builder} to construct instances:
+ *
+ * <pre>{@code
+ * CogOptions opts = CogOptions.builder()
+ *     .compression("LZW")
+ *     .compressionQuality(0.5)
+ *     .tileSize(512)
+ *     .resampling("Bilinear")
+ *     .overviewCount(3)
+ *     .build();
+ * }</pre>
+ *
+ * <p>All fields are immutable once constructed. Validation is performed in 
{@link Builder#build()}.
+ */
+public final class CogOptions {
+
+  /** Supported resampling algorithms for overview generation. */
+  private static final List<String> VALID_RESAMPLING =
+      Arrays.asList("Nearest", "Bilinear", "Bicubic");
+
+  private final String compression;
+  private final double compressionQuality;
+  private final int tileSize;
+  private final String resampling;
+  private final int overviewCount;
+
+  private CogOptions(Builder builder) {
+    this.compression = builder.compression;
+    this.compressionQuality = builder.compressionQuality;
+    this.tileSize = builder.tileSize;
+    this.resampling = builder.resampling;
+    this.overviewCount = builder.overviewCount;
+  }
+
+  /**
+   * @return Compression type: "Deflate", "LZW", "JPEG", "PackBits"
+   */
+  public String getCompression() {
+    return compression;
+  }
+
+  /**
+   * @return Compression quality from 0.0 (max compression) to 1.0 (no 
compression)
+   */
+  public double getCompressionQuality() {
+    return compressionQuality;
+  }
+
+  /**
+   * @return Tile width and height in pixels (always a power of 2)
+   */
+  public int getTileSize() {
+    return tileSize;
+  }
+
+  /**
+   * @return Resampling algorithm for overview generation: "Nearest", 
"Bilinear", or "Bicubic"
+   */
+  public String getResampling() {
+    return resampling;
+  }
+
+  /**
+   * @return Number of overview levels. -1 means auto-compute based on image 
dimensions, 0 means no
+   *     overviews.
+   */
+  public int getOverviewCount() {
+    return overviewCount;
+  }
+
+  /**
+   * @return A new builder initialized with default values
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * @return The default options (Deflate, quality 0.2, 256px tiles, Nearest, 
auto overviews)
+   */
+  public static CogOptions defaults() {
+    return new Builder().build();
+  }
+
+  @Override
+  public String toString() {
+    return "CogOptions{"
+        + "compression='"
+        + compression
+        + '\''
+        + ", compressionQuality="
+        + compressionQuality
+        + ", tileSize="
+        + tileSize
+        + ", resampling='"
+        + resampling
+        + '\''
+        + ", overviewCount="
+        + overviewCount
+        + '}';
+  }
+
+  /** Builder for {@link CogOptions}. */
+  public static final class Builder {
+    private String compression = "Deflate";
+    private double compressionQuality = 0.2;
+    private int tileSize = 256;
+    private String resampling = "Nearest";
+    private int overviewCount = -1;
+
+    private Builder() {}
+
+    /**
+     * Set the compression type. Default: "Deflate".
+     *
+     * @param compression One of "Deflate", "LZW", "JPEG", "PackBits"
+     * @return this builder
+     */
+    public Builder compression(String compression) {
+      this.compression = compression;
+      return this;
+    }
+
+    /**
+     * Set the compression quality. Default: 0.2.
+     *
+     * @param compressionQuality Value from 0.0 (max compression) to 1.0 (no 
compression)
+     * @return this builder
+     */
+    public Builder compressionQuality(double compressionQuality) {
+      this.compressionQuality = compressionQuality;
+      return this;
+    }
+
+    /**
+     * Set the tile size for both width and height. Default: 256.
+     *
+     * @param tileSize Must be a positive power of 2 (e.g. 128, 256, 512, 1024)
+     * @return this builder
+     */
+    public Builder tileSize(int tileSize) {
+      this.tileSize = tileSize;
+      return this;
+    }
+
+    /**
+     * Set the resampling algorithm for overview generation. Default: 
"Nearest".
+     *
+     * @param resampling One of "Nearest", "Bilinear", "Bicubic"
+     * @return this builder
+     */
+    public Builder resampling(String resampling) {
+      this.resampling = resampling;
+      return this;
+    }
+
+    /**
+     * Set the number of overview levels. Default: -1 (auto-compute).
+     *
+     * @param overviewCount -1 for auto, 0 for no overviews, or a positive 
count
+     * @return this builder
+     */
+    public Builder overviewCount(int overviewCount) {
+      this.overviewCount = overviewCount;
+      return this;
+    }
+
+    /**
+     * Build and validate the options.
+     *
+     * @return A validated, immutable {@link CogOptions} instance
+     * @throws IllegalArgumentException if any option is invalid
+     */
+    public CogOptions build() {
+      if (compression == null || compression.isEmpty()) {
+        throw new IllegalArgumentException("compression must not be null or 
empty");
+      }
+      if (compressionQuality < 0 || compressionQuality > 1.0) {
+        throw new IllegalArgumentException(
+            "compressionQuality must be between 0.0 and 1.0, got: " + 
compressionQuality);
+      }
+      if (tileSize <= 0) {
+        throw new IllegalArgumentException("tileSize must be positive, got: " 
+ tileSize);
+      }
+      if ((tileSize & (tileSize - 1)) != 0) {
+        throw new IllegalArgumentException("tileSize must be a power of 2, 
got: " + tileSize);
+      }
+      if (overviewCount < -1) {
+        throw new IllegalArgumentException(
+            "overviewCount must be -1 (auto), 0 (none), or positive, got: " + 
overviewCount);
+      }
+
+      // Normalize resampling to title-case for matching
+      String normalized = normalizeResampling(resampling);
+      if (!VALID_RESAMPLING.contains(normalized)) {
+        throw new IllegalArgumentException(
+            "resampling must be one of " + VALID_RESAMPLING + ", got: '" + 
resampling + "'");
+      }

Review Comment:
   `CogOptions.Builder` validates resampling/tileSize/quality but does not 
validate `compression` against the compression types actually supported by the 
GeoTools GeoTIFF writer. As-is, invalid strings will pass `build()` and fail 
later at write time. Consider normalizing/validating `compression` (and ideally 
allowing the same set mentioned in `RasterOutputs.asGeoTiff`, e.g., "None", 
"Huffman", etc.) so errors surface early with a clear message.



##########
common/src/main/java/org/apache/sedona/common/raster/cog/CogAssembler.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.common.raster.cog;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+
+/**
+ * Assembles multiple parsed TIFF IFDs into Cloud Optimized GeoTIFF (COG) byte 
order.
+ *
+ * <p>COG layout (per the spec):
+ *
+ * <pre>
+ *   [TIFF header - 8 bytes]
+ *   [IFD 0: full-res tags + overflow data]
+ *   [IFD 1: overview 2x tags + overflow data]
+ *   ...
+ *   [IFD N: smallest overview tags + overflow data]
+ *   [smallest overview image data]
+ *   ...
+ *   [overview 2x image data]
+ *   [full-res image data]
+ * </pre>
+ *
+ * <p>Key COG requirements:
+ *
+ * <ul>
+ *   <li>All IFDs are contiguous at the start of the file
+ *   <li>Image data follows all IFDs, ordered smallest overview first, 
full-res last
+ *   <li>TileOffsets/StripOffsets point forward to where image data will be 
located
+ *   <li>Overviews have NewSubfileType = 1 (ReducedImage)
+ * </ul>
+ *
+ * <p>Ported from GeoTrellis's {@code GeoTiffWriter.appendCloudOptimized()}.
+ */
+public class CogAssembler {
+
+  /** NewSubfileType value for reduced-resolution (overview) images */
+  private static final int REDUCED_IMAGE = 1;
+
+  /**
+   * Assemble parsed TIFF IFDs into COG byte order.
+   *
+   * @param parsedTiffs List of parsed TIFFs, ordered: [full-res, overview-2x, 
overview-4x, ...
+   *     smallest]. The first element is the full resolution image, subsequent 
elements are
+   *     progressively smaller overviews.
+   * @return A byte array containing the complete COG file
+   * @throws IOException if writing fails
+   */
+  public static byte[] assemble(List<TiffIfdParser.ParsedTiff> parsedTiffs) 
throws IOException {
+    if (parsedTiffs.isEmpty()) {
+      throw new IllegalArgumentException("No TIFFs to assemble");
+    }
+
+    ByteOrder byteOrder = parsedTiffs.get(0).byteOrder;
+    int ifdCount = parsedTiffs.size();
+
+    // Determine which overview IFDs need NewSubfileType injection
+    boolean[] needsNewSubfileType = new boolean[ifdCount];
+    for (int i = 1; i < ifdCount; i++) {
+      needsNewSubfileType[i] = !parsedTiffs.get(i).hasNewSubfileType;
+    }
+
+    // Phase 1: Compute sizes of all IFD regions (IFD entries + overflow data)
+    // If we need to inject NewSubfileType, the IFD grows by 12 bytes (one tag 
entry)
+    int[] ifdRegionSizes = new int[ifdCount];
+    int[] effectiveTagCounts = new int[ifdCount];
+    for (int i = 0; i < ifdCount; i++) {
+      TiffIfdParser.ParsedTiff pt = parsedTiffs.get(i);
+      int extraBytes = needsNewSubfileType[i] ? 12 : 0;
+      effectiveTagCounts[i] = pt.tagCount + (needsNewSubfileType[i] ? 1 : 0);
+      ifdRegionSizes[i] = pt.getIfdAndOverflowSize() + extraBytes;
+    }
+
+    // Phase 2: Compute absolute offsets for each IFD and its image data.
+    // Layout: [header=8] [IFD0+overflow] [IFD1+overflow] ... [IFDN+overflow]
+    //         [imageN] ... [image1] [image0]
+    int[] ifdAbsoluteOffsets = new int[ifdCount];
+    int cursor = 8; // After TIFF header
+    for (int i = 0; i < ifdCount; i++) {
+      ifdAbsoluteOffsets[i] = cursor;
+      cursor += ifdRegionSizes[i];
+    }
+    int imageDataRegionStart = cursor;
+
+    // Image data is written in reverse order (smallest overview first, 
full-res last)
+    // Compute absolute offset of each IFD's image data
+    int[] imageDataAbsoluteOffsets = new int[ifdCount];
+    int imageDataCursor = imageDataRegionStart;
+    for (int i = ifdCount - 1; i >= 0; i--) {
+      imageDataAbsoluteOffsets[i] = imageDataCursor;
+      imageDataCursor += parsedTiffs.get(i).imageData.length;
+    }
+    int totalSize = imageDataCursor;
+
+    // Phase 3: Write the COG
+    ByteArrayOutputStream bos = new ByteArrayOutputStream(totalSize);
+    DataOutputStream dos = new DataOutputStream(bos);
+
+    // Write TIFF header
+    if (byteOrder == ByteOrder.LITTLE_ENDIAN) {
+      dos.writeByte('I');
+      dos.writeByte('I');
+    } else {
+      dos.writeByte('M');
+      dos.writeByte('M');
+    }
+    writeShort(dos, byteOrder, 42); // TIFF magic
+    writeInt(dos, byteOrder, ifdAbsoluteOffsets[0]); // Offset to first IFD
+
+    // Write each IFD + its overflow data
+    for (int i = 0; i < ifdCount; i++) {
+      TiffIfdParser.ParsedTiff pt = parsedTiffs.get(i);
+      boolean isOverview = i > 0;
+      int ifdStart = ifdAbsoluteOffsets[i];
+      int nextIfdOffset = (i + 1 < ifdCount) ? ifdAbsoluteOffsets[i + 1] : 0;
+      int tagCountForIfd = effectiveTagCounts[i];
+
+      // Compute where this IFD's overflow data will be in the output
+      // Account for possible extra 12 bytes from injected tag
+      int overflowStartInOutput = ifdStart + 2 + tagCountForIfd * 12 + 4;
+
+      // Patch the IFD entries:
+      // - Rebase overflow pointers from original file offsets to new output 
offsets
+      // - Rewrite TileOffsets/StripOffsets to point to the new image data 
location
+      // - Inject NewSubfileType=1 for overview IFDs if missing
+      byte[] patchedEntries =
+          patchIfdEntries(
+              pt,
+              overflowStartInOutput,
+              imageDataAbsoluteOffsets[i],
+              isOverview,
+              needsNewSubfileType[i],
+              byteOrder);
+
+      // Write: tag count (2 bytes) + entries (tagCountForIfd*12) + next IFD 
offset (4 bytes)
+      writeShort(dos, byteOrder, tagCountForIfd);
+      dos.write(patchedEntries);
+      writeInt(dos, byteOrder, nextIfdOffset);
+
+      // Write overflow data
+      dos.write(pt.overflowData);
+    }
+
+    // Write image data in reverse order (smallest overview first)
+    for (int i = ifdCount - 1; i >= 0; i--) {
+      dos.write(parsedTiffs.get(i).imageData);
+    }
+
+    dos.flush();
+    return bos.toByteArray();
+  }
+
+  /**
+   * Patch IFD entries to update:
+   *
+   * <ol>
+   *   <li>Overflow data pointers (rebase from original file offset to new 
output offset)
+   *   <li>TileOffsets/StripOffsets values (point to new image data location)
+   *   <li>Set or inject NewSubfileType=1 for overview IFDs
+   * </ol>
+   */
+  private static byte[] patchIfdEntries(
+      TiffIfdParser.ParsedTiff pt,
+      int newOverflowStart,
+      int newImageDataStart,
+      boolean isOverview,
+      boolean injectNewSubfileType,
+      ByteOrder byteOrder) {
+
+    byte[] entries = pt.ifdEntries.clone();
+    ByteBuffer buf = ByteBuffer.wrap(entries).order(byteOrder);
+
+    int overflowDelta = newOverflowStart - pt.overflowDataStart;
+
+    for (int i = 0; i < pt.tagCount; i++) {
+      int offset = i * 12;
+      int tag = buf.getShort(offset) & 0xFFFF;
+      int fieldType = buf.getShort(offset + 2) & 0xFFFF;
+      int count = buf.getInt(offset + 4);
+      int valueSize = count * getFieldTypeSize(fieldType);
+
+      // Handle NewSubfileType tag for overviews (when already present)
+      if (tag == TiffIfdParser.TAG_NEW_SUBFILE_TYPE && isOverview) {
+        buf.putInt(offset + 8, REDUCED_IMAGE);
+        continue;
+      }
+
+      // Handle TileOffsets/StripOffsets — rewrite to point to new image data 
location
+      if (tag == TiffIfdParser.TAG_TILE_OFFSETS || tag == 
TiffIfdParser.TAG_STRIP_OFFSETS) {
+        if (count == 1 && valueSize <= 4) {
+          // Single segment: offset stored inline
+          buf.putInt(offset + 8, newImageDataStart + pt.segmentOffsets[0]);
+        } else {
+          // Multiple segments: the entry points to an overflow array.
+          // We need to rewrite the overflow array with new absolute offsets.
+          // First, rebase the pointer to the overflow data.
+          int origPointer = buf.getInt(offset + 8);
+          int newPointer = origPointer + overflowDelta;
+          buf.putInt(offset + 8, newPointer);
+
+          // Now patch the overflow data array with new image data offsets
+          int overflowArrayOffset = origPointer - pt.overflowDataStart;
+          ByteBuffer overflowBuf = 
ByteBuffer.wrap(pt.overflowData).order(byteOrder);
+          for (int j = 0; j < count; j++) {
+            int newSegmentOffset = newImageDataStart + pt.segmentOffsets[j];
+            overflowBuf.putInt(overflowArrayOffset + j * 4, newSegmentOffset);
+          }
+        }

Review Comment:
   `patchIfdEntries` mutates `pt.overflowData` in place when rewriting 
TileOffsets/StripOffsets arrays. Since `assemble(...)` takes `ParsedTiff` 
objects from the caller, this creates surprising side effects (the parsed 
structures are no longer a faithful representation of the original TIFF after 
assembly). Prefer patching a copy of the overflow buffer (and writing the 
patched copy) so `assemble` doesn’t mutate its inputs.



##########
common/src/main/java/org/apache/sedona/common/raster/cog/TiffIfdParser.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.common.raster.cog;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * Parses the IFD (Image File Directory) structure from a TIFF byte array. 
This is used to extract
+ * the structural components needed for COG assembly: the IFD entries, 
overflow tag data, and image
+ * data regions.
+ *
+ * <p>Reference: TIFF 6.0 Specification, Section 2 (TIFF Structure).
+ */
+public class TiffIfdParser {
+
+  /** Tag code for TileOffsets (0x0144 = 324) */
+  public static final int TAG_TILE_OFFSETS = 324;
+
+  /** Tag code for StripOffsets (0x0111 = 273) */
+  public static final int TAG_STRIP_OFFSETS = 273;
+
+  /** Tag code for TileByteCounts (0x0145 = 325) */
+  public static final int TAG_TILE_BYTE_COUNTS = 325;
+
+  /** Tag code for StripByteCounts (0x0117 = 279) */
+  public static final int TAG_STRIP_BYTE_COUNTS = 279;
+
+  /** Tag code for NewSubfileType (0x00FE = 254) */
+  public static final int TAG_NEW_SUBFILE_TYPE = 254;
+
+  /** TIFF field type sizes in bytes */
+  private static final int[] FIELD_TYPE_SIZES = {
+    0, // 0: unused
+    1, // 1: BYTE
+    1, // 2: ASCII
+    2, // 3: SHORT
+    4, // 4: LONG
+    8, // 5: RATIONAL
+    1, // 6: SBYTE
+    1, // 7: UNDEFINED
+    2, // 8: SSHORT
+    4, // 9: SLONG
+    8, // 10: SRATIONAL
+    4, // 11: FLOAT
+    8 // 12: DOUBLE
+  };
+
+  /**
+   * Result of parsing a TIFF file. Contains the byte order and the parsed IFD 
data for the first
+   * IFD only (we write each overview as a separate TIFF, so there's always 
exactly one IFD).
+   */
+  public static class ParsedTiff {
+    /** Byte order of the TIFF file */
+    public final ByteOrder byteOrder;
+
+    /** Offset where the first IFD starts (always 8 for standard TIFF) */
+    public final int ifdOffset;
+
+    /** Number of tag entries in the IFD */
+    public final int tagCount;
+
+    /**
+     * Raw bytes of all IFD tag entries (tagCount * 12 bytes). This includes 
the 12-byte entries but
+     * NOT the 2-byte tag count or the 4-byte next-IFD pointer.
+     */
+    public final byte[] ifdEntries;
+
+    /**
+     * Overflow tag data — values that exceed 4 bytes and are stored outside 
the IFD entries. This
+     * is collected in the order the tags reference them.
+     */
+    public final byte[] overflowData;
+
+    /**
+     * The start offset of the overflow data region in the original TIFF file. 
Used to rebase
+     * overflow pointers when reassembling.
+     */
+    public final int overflowDataStart;
+
+    /** The raw image data (all tiles/strips concatenated) */
+    public final byte[] imageData;
+
+    /** Byte offsets of each tile/strip within imageData (relative to 
imageData start) */
+    public final int[] segmentOffsets;
+
+    /** Byte counts of each tile/strip */
+    public final int[] segmentByteCounts;
+
+    /** Whether the IFD contains a NewSubfileType tag */
+    public final boolean hasNewSubfileType;
+
+    /** The total size of the IFD region: 2 (count) + tagCount*12 + 4 (next 
pointer) */
+    public int getIfdSize() {
+      return 2 + tagCount * 12 + 4;
+    }
+
+    /** The total size of IFD + overflow data (everything before image data) */
+    public int getIfdAndOverflowSize() {
+      return getIfdSize() + overflowData.length;
+    }
+
+    ParsedTiff(
+        ByteOrder byteOrder,
+        int ifdOffset,
+        int tagCount,
+        byte[] ifdEntries,
+        byte[] overflowData,
+        int overflowDataStart,
+        byte[] imageData,
+        int[] segmentOffsets,
+        int[] segmentByteCounts,
+        boolean hasNewSubfileType) {
+      this.byteOrder = byteOrder;
+      this.ifdOffset = ifdOffset;
+      this.tagCount = tagCount;
+      this.ifdEntries = ifdEntries;
+      this.overflowData = overflowData;
+      this.overflowDataStart = overflowDataStart;
+      this.imageData = imageData;
+      this.segmentOffsets = segmentOffsets;
+      this.segmentByteCounts = segmentByteCounts;
+      this.hasNewSubfileType = hasNewSubfileType;
+    }
+  }
+
+  /**
+   * Parse a standard TIFF byte array and extract its first IFD structure.
+   *
+   * @param tiffBytes The complete TIFF file as a byte array
+   * @return ParsedTiff with all structural components extracted
+   * @throws IllegalArgumentException if the TIFF header is invalid
+   */
+  public static ParsedTiff parse(byte[] tiffBytes) {
+    if (tiffBytes.length < 8) {
+      throw new IllegalArgumentException("TIFF data too short: " + 
tiffBytes.length + " bytes");
+    }
+
+    // Read byte order from first 2 bytes
+    ByteOrder byteOrder;
+    if (tiffBytes[0] == 'I' && tiffBytes[1] == 'I') {
+      byteOrder = ByteOrder.LITTLE_ENDIAN;
+    } else if (tiffBytes[0] == 'M' && tiffBytes[1] == 'M') {
+      byteOrder = ByteOrder.BIG_ENDIAN;
+    } else {
+      throw new IllegalArgumentException(
+          "Invalid TIFF byte order marker: " + tiffBytes[0] + ", " + 
tiffBytes[1]);
+    }
+
+    ByteBuffer buf = ByteBuffer.wrap(tiffBytes).order(byteOrder);
+
+    // Verify TIFF magic number (42)
+    int magic = buf.getShort(2) & 0xFFFF;
+    if (magic != 42) {
+      throw new IllegalArgumentException("Not a standard TIFF file (magic=" + 
magic + ")");
+    }
+
+    // Read first IFD offset
+    int ifdOffset = buf.getInt(4);
+    if (ifdOffset < 8 || ifdOffset >= tiffBytes.length - 2) {
+      throw new IllegalArgumentException(
+          "IFD offset out of range: " + ifdOffset + " (file size: " + 
tiffBytes.length + ")");
+    }
+
+    // Read number of directory entries
+    int tagCount = buf.getShort(ifdOffset) & 0xFFFF;
+
+    // Read all IFD entries (12 bytes each)
+    int entriesStart = ifdOffset + 2;
+    int entriesLen = tagCount * 12;
+    if (entriesStart + entriesLen > tiffBytes.length) {
+      throw new IllegalArgumentException(
+          "IFD entries extend beyond file: entriesStart="
+              + entriesStart
+              + " entriesLen="
+              + entriesLen
+              + " fileSize="
+              + tiffBytes.length);
+    }
+    byte[] ifdEntries = new byte[entriesLen];
+    System.arraycopy(tiffBytes, entriesStart, ifdEntries, 0, entriesLen);
+
+    // Find the offsets tag and bytecounts tag to locate image data
+    int offsetsTag = -1;
+    int byteCountsTag = -1;
+    int segmentCount = 0;
+    boolean hasNewSubfileType = false;
+
+    // Also track the overflow data region
+    int overflowStart = Integer.MAX_VALUE;
+    int overflowEnd = 0;
+
+    // First pass: find offset/bytecount tags and overflow region
+    for (int i = 0; i < tagCount; i++) {
+      int entryOffset = entriesStart + i * 12;
+      int tag = buf.getShort(entryOffset) & 0xFFFF;
+      int fieldType = buf.getShort(entryOffset + 2) & 0xFFFF;
+      int count = buf.getInt(entryOffset + 4);
+      int valueSize = count * getFieldTypeSize(fieldType);
+
+      if (tag == TAG_TILE_OFFSETS || tag == TAG_STRIP_OFFSETS) {
+        offsetsTag = tag;
+        segmentCount = count;
+      } else if (tag == TAG_TILE_BYTE_COUNTS || tag == TAG_STRIP_BYTE_COUNTS) {
+        byteCountsTag = tag;
+      } else if (tag == TAG_NEW_SUBFILE_TYPE) {
+        hasNewSubfileType = true;
+      }
+
+      // Track overflow data region (values > 4 bytes stored outside IFD 
entries)
+      if (valueSize > 4) {
+        int valOffset = buf.getInt(entryOffset + 8);
+        if (valOffset < 0 || valOffset + valueSize > tiffBytes.length) {
+          throw new IllegalArgumentException(
+              "Overflow data for tag "
+                  + tag
+                  + " out of range: offset="
+                  + valOffset
+                  + " size="
+                  + valueSize
+                  + " fileSize="
+                  + tiffBytes.length);
+        }
+        overflowStart = Math.min(overflowStart, valOffset);
+        overflowEnd = Math.max(overflowEnd, valOffset + valueSize);
+      }
+    }
+
+    if (offsetsTag < 0 || byteCountsTag < 0) {
+      throw new IllegalArgumentException(
+          "TIFF missing TileOffsets/StripOffsets or 
TileByteCounts/StripByteCounts tags");
+    }
+
+    // Read segment offsets and byte counts
+    int[] segmentOffsets = readIntArray(buf, tiffBytes, entriesStart, 
tagCount, offsetsTag);
+    int[] segmentByteCounts = readIntArray(buf, tiffBytes, entriesStart, 
tagCount, byteCountsTag);
+
+    // Extract overflow data
+    byte[] overflowData;
+    int overflowDataStart;
+    if (overflowStart < Integer.MAX_VALUE) {
+      overflowDataStart = overflowStart;
+      overflowData = new byte[overflowEnd - overflowStart];
+      System.arraycopy(tiffBytes, overflowStart, overflowData, 0, 
overflowData.length);
+    } else {
+      overflowDataStart = 0;
+      overflowData = new byte[0];
+    }
+
+    // Find image data bounds
+    int imageDataStart = Integer.MAX_VALUE;
+    int imageDataEnd = 0;
+    for (int i = 0; i < segmentCount; i++) {
+      imageDataStart = Math.min(imageDataStart, segmentOffsets[i]);
+      imageDataEnd = Math.max(imageDataEnd, segmentOffsets[i] + 
segmentByteCounts[i]);
+    }

Review Comment:
   After reading `segmentOffsets` and `segmentByteCounts`, the code assumes 
both arrays have the same length (`segmentCount`) and later indexes them 
together. If a malformed TIFF provides mismatched counts for offsets vs 
byte-counts, this can throw `ArrayIndexOutOfBoundsException` instead of a clear 
validation error. Add an explicit length/count equality check and throw 
`IllegalArgumentException` when they differ.



##########
common/src/main/java/org/apache/sedona/common/raster/RasterOutputs.java:
##########
@@ -87,6 +89,56 @@ public static byte[] asGeoTiff(GridCoverage2D raster) {
     return asGeoTiff(raster, null, -1);
   }
 
+  /**
+   * Creates a Cloud Optimized GeoTIFF (COG) byte array from the given raster. 
The COG format
+   * arranges tiles and overviews in an order optimized for HTTP range-request 
based access,
+   * enabling efficient partial reads from cloud storage.
+   *
+   * @param raster The input raster
+   * @param compressionType Compression type: "Deflate", "LZW", "JPEG", 
"PackBits", or null for
+   *     default (Deflate)
+   * @param compressionQuality Quality 0.0 (max compression) to 1.0 (no 
compression)
+   * @return COG file as byte array
+   */
+  public static byte[] asCloudOptimizedGeoTiff(
+      GridCoverage2D raster, String compressionType, double 
compressionQuality) {
+    try {
+      return CogWriter.write(raster, compressionType, compressionQuality);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to write Cloud Optimized GeoTIFF", e);
+    }
+  }
+
+  /**
+   * Creates a Cloud Optimized GeoTIFF (COG) byte array with default settings 
(Deflate compression,
+   * 256x256 tiles).
+   *
+   * @param raster The input raster
+   * @return COG file as byte array
+   */
+  public static byte[] asCloudOptimizedGeoTiff(GridCoverage2D raster) {
+    try {
+      return CogWriter.write(raster);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to write Cloud Optimized GeoTIFF", e);
+    }
+  }
+
+  /**
+   * Creates a Cloud Optimized GeoTIFF (COG) byte array with the given options.
+   *
+   * @param raster The input raster
+   * @param options COG generation options (compression, tileSize, resampling, 
overviewCount)
+   * @return COG file as byte array
+   */
+  public static byte[] asCloudOptimizedGeoTiff(GridCoverage2D raster, 
CogOptions options) {
+    try {
+      return CogWriter.write(raster, options);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to write Cloud Optimized GeoTIFF", e);
+    }
+  }

Review Comment:
   The PR description says it “does not affect any public API”, but this change 
adds new public methods to `RasterOutputs` (`asCloudOptimizedGeoTiff` 
overloads). Either update the PR description to reflect the public API 
addition, or (if the intent is to keep it internal) reduce visibility / place 
it behind an internal API.



##########
common/src/main/java/org/apache/sedona/common/raster/cog/CogWriter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.common.raster.cog;
+
+import java.awt.image.RenderedImage;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.imageio.ImageWriteParam;
+import javax.media.jai.Interpolation;
+import javax.media.jai.InterpolationBicubic;
+import javax.media.jai.InterpolationBilinear;
+import javax.media.jai.InterpolationNearest;
+import org.geotools.api.coverage.grid.GridCoverageWriter;
+import org.geotools.api.parameter.GeneralParameterValue;
+import org.geotools.api.parameter.ParameterValueGroup;
+import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
+import org.geotools.api.referencing.datum.PixelInCell;
+import org.geotools.coverage.grid.GridCoverage2D;
+import org.geotools.coverage.grid.GridEnvelope2D;
+import org.geotools.coverage.grid.GridGeometry2D;
+import org.geotools.coverage.grid.io.AbstractGridFormat;
+import org.geotools.coverage.processing.Operations;
+import org.geotools.gce.geotiff.GeoTiffWriteParams;
+import org.geotools.gce.geotiff.GeoTiffWriter;
+import org.geotools.referencing.operation.transform.AffineTransform2D;
+
+/**
+ * Creates Cloud Optimized GeoTIFF (COG) files from GeoTools GridCoverage2D 
rasters.
+ *
+ * <p>The COG generation process:
+ *
+ * <ol>
+ *   <li>Compute overview decimation factors (power of 2: 2, 4, 8, ...)
+ *   <li>Generate overview images by downsampling
+ *   <li>Write each (full-res + overviews) as a separate tiled GeoTIFF via 
GeoTools
+ *   <li>Parse each TIFF's IFD structure
+ *   <li>Reassemble into COG byte order using {@link CogAssembler}
+ * </ol>
+ *
+ * <p>Overview decimation algorithm ported from GeoTrellis's {@code
+ * GeoTiff.defaultOverviewDecimations}.
+ */
+public class CogWriter {
+
+  /** Default tile size for COG output, matching GDAL's default */
+  public static final int DEFAULT_TILE_SIZE = 256;
+
+  /** Minimum image dimension to create an overview for */
+  private static final int MIN_OVERVIEW_SIZE = 2;
+
+  /**
+   * Write a GridCoverage2D as a Cloud Optimized GeoTIFF byte array using the 
given options.
+   *
+   * @param raster The input raster
+   * @param options COG generation options (compression, tileSize, resampling, 
overviewCount)
+   * @return COG file as byte array
+   * @throws IOException if writing fails
+   */
+  public static byte[] write(GridCoverage2D raster, CogOptions options) throws 
IOException {
+    String compressionType = options.getCompression();
+    double compressionQuality = options.getCompressionQuality();
+    int tileSize = options.getTileSize();
+    String resampling = options.getResampling();
+    int requestedOverviewCount = options.getOverviewCount();
+
+    RenderedImage image = raster.getRenderedImage();
+    int cols = image.getWidth();
+    int rows = image.getHeight();
+
+    // Step 1: Compute overview decimation factors
+    List<Integer> decimations;
+    if (requestedOverviewCount == 0) {
+      decimations = new ArrayList<>();
+    } else {
+      decimations = computeOverviewDecimations(cols, rows, tileSize);
+      if (requestedOverviewCount > 0 && requestedOverviewCount < 
decimations.size()) {
+        decimations = decimations.subList(0, requestedOverviewCount);
+      }
+    }
+
+    // Step 2: Generate overview coverages
+    Interpolation interpolation = getInterpolation(resampling);
+    List<GridCoverage2D> overviews = new ArrayList<>();
+    for (int decimation : decimations) {
+      GridCoverage2D overview = generateOverview(raster, decimation, 
interpolation);
+      overviews.add(overview);
+    }
+
+    // Step 3: Write each as a tiled GeoTIFF byte array
+    List<byte[]> tiffBytes = new ArrayList<>();
+    tiffBytes.add(writeAsTiledGeoTiff(raster, compressionType, 
compressionQuality, tileSize));
+    for (GridCoverage2D overview : overviews) {
+      tiffBytes.add(writeAsTiledGeoTiff(overview, compressionType, 
compressionQuality, tileSize));
+    }
+
+    // Step 4: Parse each TIFF's IFD structure
+    List<TiffIfdParser.ParsedTiff> parsedTiffs = new ArrayList<>();
+    for (byte[] bytes : tiffBytes) {
+      parsedTiffs.add(TiffIfdParser.parse(bytes));
+    }
+
+    // Step 5: Reassemble into COG byte order
+    return CogAssembler.assemble(parsedTiffs);
+  }
+
+  /**
+   * Write a GridCoverage2D as a Cloud Optimized GeoTIFF byte array.
+   *
+   * @param raster The input raster
+   * @param compressionType Compression type: "Deflate", "LZW", "JPEG", 
"PackBits", or null for
+   *     default (Deflate)
+   * @param compressionQuality Quality 0.0 (max compression) to 1.0 (no 
compression), or -1 for
+   *     default
+   * @param tileSize Tile width and height in pixels
+   * @return COG file as byte array
+   * @throws IOException if writing fails
+   */
+  public static byte[] write(
+      GridCoverage2D raster, String compressionType, double 
compressionQuality, int tileSize)
+      throws IOException {
+
+    CogOptions.Builder builder = CogOptions.builder().tileSize(tileSize);
+    if (compressionType != null) {
+      builder.compression(compressionType);
+    }
+    if (compressionQuality >= 0) {
+      builder.compressionQuality(compressionQuality);
+    }
+    return write(raster, builder.build());
+  }
+
+  /**
+   * Write a GridCoverage2D as COG with default settings (Deflate compression, 
256x256 tiles).
+   *
+   * @param raster The input raster
+   * @return COG file as byte array
+   * @throws IOException if writing fails
+   */
+  public static byte[] write(GridCoverage2D raster) throws IOException {
+    return write(raster, "Deflate", 0.2, DEFAULT_TILE_SIZE);
+  }
+
+  /**
+   * Write a GridCoverage2D as COG with specified compression.
+   *
+   * @param raster The input raster
+   * @param compressionType Compression type
+   * @param compressionQuality Quality 0.0 to 1.0
+   * @return COG file as byte array
+   * @throws IOException if writing fails
+   */
+  public static byte[] write(
+      GridCoverage2D raster, String compressionType, double 
compressionQuality) throws IOException {
+    return write(raster, compressionType, compressionQuality, 
DEFAULT_TILE_SIZE);
+  }
+
+  /**
+   * Compute overview decimation factors. Each level is a power of 2.
+   *
+   * <p>Ported from GeoTrellis: {@code GeoTiff.defaultOverviewDecimations()}
+   *
+   * @param cols Image width in pixels
+   * @param rows Image height in pixels
+   * @param blockSize Tile size for the overview
+   * @return List of decimation factors [2, 4, 8, ...] or empty if image is 
too small
+   */
+  static List<Integer> computeOverviewDecimations(int cols, int rows, int 
blockSize) {
+    List<Integer> decimations = new ArrayList<>();
+    double pixels = Math.max(cols, rows);
+    double blocks = pixels / blockSize;
+    int overviewLevels = (int) Math.ceil(Math.log(blocks) / Math.log(2));
+
+    for (int level = 0; level < overviewLevels; level++) {
+      int decimation = (int) Math.pow(2, level + 1);
+      int overviewCols = (int) Math.ceil((double) cols / decimation);
+      int overviewRows = (int) Math.ceil((double) rows / decimation);
+      if (overviewCols < MIN_OVERVIEW_SIZE || overviewRows < 
MIN_OVERVIEW_SIZE) {
+        break;
+      }
+      decimations.add(decimation);
+    }
+    return decimations;
+  }
+
+  /**
+   * Generate an overview (reduced resolution) coverage by downsampling.
+   *
+   * @param raster The full resolution raster
+   * @param decimationFactor Factor to reduce by (2 = half size, 4 = quarter, 
etc.)
+   * @param interpolation The interpolation method to use for resampling
+   * @return A new GridCoverage2D at reduced resolution
+   */
+  static GridCoverage2D generateOverview(
+      GridCoverage2D raster, int decimationFactor, Interpolation 
interpolation) {
+    RenderedImage image = raster.getRenderedImage();
+    int newWidth = (int) Math.ceil((double) image.getWidth() / 
decimationFactor);
+    int newHeight = (int) Math.ceil((double) image.getHeight() / 
decimationFactor);
+
+    // Use GeoTools Operations.DEFAULT.resample to downsample
+    CoordinateReferenceSystem crs = raster.getCoordinateReferenceSystem2D();
+
+    AffineTransform2D originalTransform =
+        (AffineTransform2D) raster.getGridGeometry().getGridToCRS2D();
+    double newScaleX = originalTransform.getScaleX() * decimationFactor;
+    double newScaleY = originalTransform.getScaleY() * decimationFactor;
+
+    AffineTransform2D newTransform =
+        new AffineTransform2D(
+            newScaleX,
+            originalTransform.getShearY(),
+            originalTransform.getShearX(),
+            newScaleY,
+            originalTransform.getTranslateX(),
+            originalTransform.getTranslateY());
+
+    GridGeometry2D gridGeometry =
+        new GridGeometry2D(
+            new GridEnvelope2D(0, 0, newWidth, newHeight),
+            PixelInCell.CELL_CORNER,
+            newTransform,
+            crs,
+            null);
+
+    return (GridCoverage2D) Operations.DEFAULT.resample(raster, null, 
gridGeometry, interpolation);
+  }
+
+  /**
+   * Generate an overview using default nearest-neighbor interpolation. Kept 
for backward
+   * compatibility with tests.
+   */
+  static GridCoverage2D generateOverview(GridCoverage2D raster, int 
decimationFactor) {
+    return generateOverview(raster, decimationFactor, new 
InterpolationNearest());
+  }
+
+  /**
+   * Map a resampling algorithm name to a JAI Interpolation instance.
+   *
+   * @param resampling One of "Nearest", "Bilinear", "Bicubic"
+   * @return The corresponding JAI Interpolation
+   */
+  private static Interpolation getInterpolation(String resampling) {
+    switch (resampling) {
+      case "Bilinear":
+        return new InterpolationBilinear();
+      case "Bicubic":
+        return new InterpolationBicubic(8);
+      case "Nearest":
+      default:
+        return new InterpolationNearest();
+    }
+  }
+
+  /**
+   * Write a GridCoverage2D as a tiled GeoTIFF byte array using GeoTools.
+   *
+   * @param raster The input raster
+   * @param compressionType Compression type
+   * @param compressionQuality Quality 0.0 to 1.0
+   * @param tileSize Tile dimensions in pixels
+   * @return Tiled GeoTIFF as byte array
+   * @throws IOException if writing fails
+   */
+  private static byte[] writeAsTiledGeoTiff(
+      GridCoverage2D raster, String compressionType, double 
compressionQuality, int tileSize)
+      throws IOException {
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    GridCoverageWriter writer = new GeoTiffWriter(out);
+
+    ParameterValueGroup defaultParams = 
writer.getFormat().getWriteParameters();
+    GeoTiffWriteParams params = new GeoTiffWriteParams();
+
+    // Set tiling — must use the 2-arg overload from GeoToolsWriteParams
+    // which delegates to the inner write param. The 4-arg 
ImageWriteParam.setTiling()
+    // writes to the wrong fields (parent vs inner param).
+    params.setTilingMode(ImageWriteParam.MODE_EXPLICIT);
+    params.setTiling(tileSize, tileSize);
+
+    // Set compression
+    params.setCompressionMode(ImageWriteParam.MODE_EXPLICIT);
+    params.setCompressionType(compressionType);
+    params.setCompressionQuality((float) compressionQuality);
+
+    defaultParams
+        
.parameter(AbstractGridFormat.GEOTOOLS_WRITE_PARAMS.getName().toString())
+        .setValue(params);
+
+    GeneralParameterValue[] wps = defaultParams.values().toArray(new 
GeneralParameterValue[0]);
+
+    writer.write(raster, wps);
+    writer.dispose();
+    out.close();
+
+    return out.toByteArray();

Review Comment:
   `writeAsTiledGeoTiff` doesn’t guarantee `writer.dispose()` runs if 
`writer.write(...)` throws, which can leak native resources/file handles in 
GeoTools. Wrap the writer lifecycle in try/finally (or try-with-resources if 
applicable) so `dispose()` is always called on failure paths.
   ```suggestion
       byte[] result;
       try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
         GridCoverageWriter writer = null;
         try {
           writer = new GeoTiffWriter(out);
   
           ParameterValueGroup defaultParams = 
writer.getFormat().getWriteParameters();
           GeoTiffWriteParams params = new GeoTiffWriteParams();
   
           // Set tiling — must use the 2-arg overload from GeoToolsWriteParams
           // which delegates to the inner write param. The 4-arg 
ImageWriteParam.setTiling()
           // writes to the wrong fields (parent vs inner param).
           params.setTilingMode(ImageWriteParam.MODE_EXPLICIT);
           params.setTiling(tileSize, tileSize);
   
           // Set compression
           params.setCompressionMode(ImageWriteParam.MODE_EXPLICIT);
           params.setCompressionType(compressionType);
           params.setCompressionQuality((float) compressionQuality);
   
           defaultParams
               
.parameter(AbstractGridFormat.GEOTOOLS_WRITE_PARAMS.getName().toString())
               .setValue(params);
   
           GeneralParameterValue[] wps =
               defaultParams.values().toArray(new GeneralParameterValue[0]);
   
           writer.write(raster, wps);
         } finally {
           if (writer != null) {
             writer.dispose();
           }
         }
         result = out.toByteArray();
       }
   
       return result;
   ```



##########
common/src/test/java/org/apache/sedona/common/raster/cog/CogWriterTest.java:
##########
@@ -0,0 +1,651 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.common.raster.cog;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import org.apache.sedona.common.raster.MapAlgebra;
+import org.apache.sedona.common.raster.RasterConstructors;
+import org.apache.sedona.common.raster.RasterOutputs;
+import org.geotools.coverage.grid.GridCoverage2D;
+import org.junit.Test;
+
+public class CogWriterTest {
+
+  private static final String resourceFolder =
+      System.getProperty("user.dir") + "/../spark/common/src/test/resources/";
+
+  private GridCoverage2D rasterFromGeoTiff(String filePath) throws IOException 
{
+    byte[] bytes = Files.readAllBytes(Paths.get(filePath));
+    return RasterConstructors.fromGeoTiff(bytes);
+  }
+
+  @Test
+  public void testComputeOverviewDecimations() {
+    // 1000x1000 with blockSize=256: ceil(log2(1000/256)) = ceil(1.97) = 2 
levels -> [2, 4]
+    List<Integer> decimations = CogWriter.computeOverviewDecimations(1000, 
1000, 256);
+    assertEquals(2, decimations.size());
+    assertEquals(Integer.valueOf(2), decimations.get(0));
+    assertEquals(Integer.valueOf(4), decimations.get(1));
+
+    // 10000x10000 with blockSize=256: ceil(log2(10000/256)) = ceil(5.29) = 6 
levels
+    decimations = CogWriter.computeOverviewDecimations(10000, 10000, 256);
+    assertEquals(6, decimations.size());
+    assertEquals(Integer.valueOf(2), decimations.get(0));
+    assertEquals(Integer.valueOf(4), decimations.get(1));
+    assertEquals(Integer.valueOf(8), decimations.get(2));
+    assertEquals(Integer.valueOf(16), decimations.get(3));
+    assertEquals(Integer.valueOf(32), decimations.get(4));
+    assertEquals(Integer.valueOf(64), decimations.get(5));
+
+    // Very small image: 50x50 with blockSize=256 -> no overviews
+    decimations = CogWriter.computeOverviewDecimations(50, 50, 256);
+    assertEquals(0, decimations.size());
+
+    // Exactly one tile: 256x256 with blockSize=256 -> no overviews
+    decimations = CogWriter.computeOverviewDecimations(256, 256, 256);
+    assertEquals(0, decimations.size());
+  }
+
+  @Test
+  public void testGenerateOverview() {
+    // Create a 100x100 single-band raster
+    double[] bandValues = new double[100 * 100];
+    for (int i = 0; i < bandValues.length; i++) {
+      bandValues[i] = i % 256;
+    }
+    GridCoverage2D raster =
+        RasterConstructors.makeNonEmptyRaster(
+            1, "d", 100, 100, 0, 0, 1, -1, 0, 0, 4326, new double[][] 
{bandValues});
+
+    // Downsample by factor of 2
+    GridCoverage2D overview = CogWriter.generateOverview(raster, 2);
+    assertNotNull(overview);
+    assertEquals(50, overview.getRenderedImage().getWidth());
+    assertEquals(50, overview.getRenderedImage().getHeight());
+  }
+
+  @Test
+  public void testWriteSmallRasterAsCog() throws IOException {
+    // Create a small raster (no overviews expected due to small size)
+    double[] bandValues = new double[50 * 50];
+    for (int i = 0; i < bandValues.length; i++) {
+      bandValues[i] = i % 256;
+    }
+    GridCoverage2D raster =
+        RasterConstructors.makeNonEmptyRaster(
+            1, "d", 50, 50, 0, 0, 1, -1, 0, 0, 4326, new double[][] 
{bandValues});
+
+    byte[] cogBytes = RasterOutputs.asCloudOptimizedGeoTiff(raster);
+    assertNotNull(cogBytes);
+    assertTrue(cogBytes.length > 0);
+
+    // Verify it's a valid TIFF
+    assertTrue(
+        (cogBytes[0] == 'I' && cogBytes[1] == 'I') || (cogBytes[0] == 'M' && 
cogBytes[1] == 'M'));
+
+    // Verify it can be read back
+    GridCoverage2D readBack = RasterConstructors.fromGeoTiff(cogBytes);
+    assertNotNull(readBack);
+    assertEquals(50, readBack.getRenderedImage().getWidth());
+    assertEquals(50, readBack.getRenderedImage().getHeight());
+  }
+
+  @Test
+  public void testWriteMediumRasterAsCog() throws IOException {
+    // Create a 512x512 raster (should produce overviews with 256 tile size)
+    double[] bandValues = new double[512 * 512];
+    for (int i = 0; i < bandValues.length; i++) {
+      bandValues[i] = (i * 7) % 256;
+    }
+    GridCoverage2D raster =
+        RasterConstructors.makeNonEmptyRaster(
+            1, "d", 512, 512, 0, 0, 1, -1, 0, 0, 4326, new double[][] 
{bandValues});
+
+    byte[] cogBytes = RasterOutputs.asCloudOptimizedGeoTiff(raster, "Deflate", 
0.5);
+    assertNotNull(cogBytes);
+    assertTrue(cogBytes.length > 0);
+
+    // Verify COG structure: IFDs should be at the beginning of the file
+    ByteOrder byteOrder = (cogBytes[0] == 'I') ? ByteOrder.LITTLE_ENDIAN : 
ByteOrder.BIG_ENDIAN;
+    ByteBuffer buf = ByteBuffer.wrap(cogBytes).order(byteOrder);
+
+    // First IFD should be at offset 8 (right after header)
+    int firstIfdOffset = buf.getInt(4);
+    assertEquals(8, firstIfdOffset);
+
+    // Read first IFD tag count
+    int tagCount = buf.getShort(firstIfdOffset) & 0xFFFF;
+    assertTrue("First IFD should have tags", tagCount > 0);
+
+    // Check that nextIFDOffset points to another IFD (should have at least 1 
overview)
+    int nextIfdPointerPos = firstIfdOffset + 2 + tagCount * 12;
+    int nextIfdOffset = buf.getInt(nextIfdPointerPos);
+    // For a 512x512 image with 256 tile size, we expect at least one overview
+    assertTrue("Should have at least one overview IFD", nextIfdOffset > 0);
+    // The next IFD should be before any image data (COG requirement)
+    assertTrue(
+        "Overview IFD should immediately follow first IFD region",
+        nextIfdOffset < cogBytes.length / 2);
+

Review Comment:
   This assertion uses `nextIfdOffset < cogBytes.length / 2` as a proxy for 
“IFDs are before image data”. That threshold is arbitrary and can be flaky if 
compression makes the image data very small (valid COGs can have IFD/metadata > 
50% of the file). Prefer asserting `nextIfdOffset` (or the end of the IFD 
region) is strictly less than the minimum TileOffsets/StripOffsets value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to