This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch research/cluster-compress
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/research/cluster-compress by 
this push:
     new c1a1b0b7 Cluster Based Compression methods added (#576)
c1a1b0b7 is described below

commit c1a1b0b7acba8f67e0bbed1ef390085caea4d153
Author: SamM755 <[email protected]>
AuthorDate: Sat Aug 23 19:33:09 2025 +0800

    Cluster Based Compression methods added (#576)
---
 java/pom.xml                                       |   4 +-
 .../tsfile/encoding/decoder/ClusterDecoder.java    | 215 +++++++++++
 .../tsfile/encoding/decoder/ClusterReader.java     |  35 ++
 .../apache/tsfile/encoding/decoder/Decoder.java    |   9 +-
 .../tsfile/encoding/encoder/AClusterAlgorithm.java | 213 +++++++++++
 .../tsfile/encoding/encoder/AClusterEncoder.java   | 422 +++++++++++++++++++++
 .../tsfile/encoding/encoder/ClusterSupport.java    |  77 ++++
 .../tsfile/encoding/encoder/KClusterAlgorithm.java | 278 ++++++++++++++
 .../tsfile/encoding/encoder/KClusterEncoder.java   | 381 +++++++++++++++++++
 .../tsfile/encoding/encoder/TSEncodingBuilder.java |  80 +++-
 .../tsfile/file/metadata/enums/TSEncoding.java     |  12 +-
 .../decoder/AClusterEncoderDecoderTest.java        | 210 ++++++++++
 12 files changed, 1930 insertions(+), 6 deletions(-)

diff --git a/java/pom.xml b/java/pom.xml
index 8c6be19b..b05a9207 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -56,7 +56,7 @@
             <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-lang3</artifactId>
-                <version>3.15.0</version>
+                <version>3.16.0</version>
             </dependency>
             <dependency>
                 <groupId>org.lz4</groupId>
@@ -99,7 +99,7 @@
             <dependency>
                 <groupId>com.google.code.gson</groupId>
                 <artifactId>gson</artifactId>
-                <version>2.10.1</version>
+                <version>2.11.0</version>
             </dependency>
         </dependencies>
     </dependencyManagement>
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterDecoder.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterDecoder.java
new file mode 100644
index 00000000..70479840
--- /dev/null
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterDecoder.java
@@ -0,0 +1,215 @@
+package org.apache.tsfile.encoding.decoder;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.encoding.TsFileDecodingException;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class ClusterDecoder extends Decoder {
+
+    private final TSDataType dataType;
+
+
+    private long[] longValues;
+    private double[] doubleValues;
+    private int readIndex = 0;
+    private int count = 0;
+    private boolean hasDecoded = false;
+
+    public ClusterDecoder(TSDataType dataType) {
+        super(TSEncoding.ACLUSTER);
+        this.dataType = dataType;
+    }
+
+    /**
+     * check has next
+     */
+    @Override
+    public boolean hasNext(ByteBuffer buffer) throws IOException {
+        if (!hasDecoded) {
+            decodeInternally(buffer);
+        }
+        return readIndex < count;
+    }
+
+    /**
+     * reset
+     */
+    @Override
+    public void reset() {
+        this.hasDecoded = false;
+        this.readIndex = 0;
+        this.count = 0;
+        this.longValues = null;
+        this.doubleValues = null;
+    }
+
+    @Override
+    public int readInt(ByteBuffer buffer){
+        return (int) longValues[readIndex++];
+    }
+
+    @Override
+    public long readLong(ByteBuffer buffer){
+        return longValues[readIndex++];
+    }
+
+    @Override
+    public float readFloat(ByteBuffer buffer){
+        return (float) doubleValues[readIndex++];
+    }
+
+    @Override
+    public double readDouble(ByteBuffer buffer){
+        return doubleValues[readIndex++];
+    }
+
+
+    /**
+     * Internal decode method
+     * Decode ACluster
+     */
+    private void decodeInternally(ByteBuffer buffer) {
+        if (hasDecoded) {
+            return;
+        }
+
+        ClusterReader reader = new ClusterReader(buffer);
+
+        // --- Header ---
+        int scalingExponent = (int) reader.read(8);
+        int k = (int) reader.read(16);
+        this.count = (int) reader.read(16);
+        int packSize = (int) reader.read(16);
+
+        // --- Global Minimum Value (minVal) ---
+        int minValBit = (int) reader.read(8);
+        long minValSign = reader.read(1);
+        long absMinVal = reader.read(minValBit); // Read the absolute value 
part
+        long minVal = (minValSign == 1) ? -absMinVal : absMinVal; // Apply the 
sign
+
+        // Allocate memory based on count and dataType
+        if (this.count > 0) {
+            if (dataType == TSDataType.FLOAT || dataType == TSDataType.DOUBLE) 
{
+                this.doubleValues = new double[this.count];
+            } else {
+                this.longValues = new long[this.count];
+            }
+        }
+
+        // Handle case where all values are the same (k=0)
+        if (k == 0) {
+            if (this.count > 0) {
+                reconstructFromMinValOnly(minVal, scalingExponent);
+            }
+            this.hasDecoded = true;
+            return;
+        }
+
+        // --- Medoids ---
+        long[] medoids = new long[k];
+        int minMedoidBit = (int) reader.read(8);
+        long minMedoidSign = reader.read(1);
+        long absMinMedoid = reader.read(minMedoidBit); // Read the absolute 
value part
+        long minMedoid = (minMedoidSign == 1) ? -absMinMedoid : absMinMedoid; 
// Apply the sign
+
+        int maxMedoidOffsetBits = (int) reader.read(8);
+        for (int i = 0; i < k; i++) {
+            long offset = reader.read(maxMedoidOffsetBits);
+            medoids[i] = minMedoid + offset;
+        }
+
+        // --- Frequencies (Cluster Sizes) ---
+        // The encoder wrote deltas (cluster sizes), so we read them and 
rebuild the cumulative array
+        long[] cumulativeFrequencies = new long[k];
+        int numFreqBlocks = (int) reader.read(16);
+
+        // Metadata pass for frequencies
+        int[] freqBlockMaxBits = new int[numFreqBlocks];
+        for (int i = 0; i < numFreqBlocks; i++) {
+            freqBlockMaxBits[i] = (int) reader.read(8);
+        }
+
+        // Data pass for frequencies - Reconstruct cumulative frequencies
+        long currentCumulativeFreq = 0;
+        int freqIndex = 0;
+        for (int i = 0; i < numFreqBlocks; i++) {
+            int start = i * packSize;
+            int end = Math.min(start + packSize, k);
+            int bitsForBlock = freqBlockMaxBits[i];
+            for (int j = start; j < end; j++) {
+                long delta = reader.read(bitsForBlock); // This delta is the 
actual cluster size
+                currentCumulativeFreq += delta;
+                cumulativeFrequencies[freqIndex++] = currentCumulativeFreq;
+            }
+        }
+
+        // --- Residuals ---
+        long[] residuals = new long[this.count];
+        int numPacks = (int) reader.read(32);
+
+        // Metadata pass for residuals
+        int[] resPackMaxBits = new int[numPacks];
+        for (int i = 0; i < numPacks; i++) {
+            resPackMaxBits[i] = (int) reader.read(8);
+        }
+
+        // Data pass for residuals
+        int residualIdx = 0;
+        for (int i = 0; i < numPacks; i++) {
+            int start = i * packSize;
+            int end = Math.min(start + packSize, this.count);
+            int bitsForPack = resPackMaxBits[i];
+            if (bitsForPack > 0) {
+                for (int j = start; j < end; j++) {
+                    residuals[residualIdx++] = reader.read(bitsForPack);
+                }
+            } else {
+                // If bitsForPack is 0, all residuals in this pack are 0.
+                // We just need to advance the index, as the array is already 
initialized to 0.
+                residualIdx += (end - start);
+            }
+        }
+
+        // --- Final Data Reconstruction ---
+        // Use the correctly reconstructed cumulativeFrequencies array
+        reconstructData(medoids, cumulativeFrequencies, residuals, minVal, 
scalingExponent);
+
+        this.hasDecoded = true;
+    }
+
+    private void reconstructData(long[] medoids, long[] frequencies, long[] 
residuals, long minVal, int scalingExponent) {
+        int residualReadPos = 0;
+        int dataWritePos = 0;
+        double scalingFactor = Math.pow(10, scalingExponent);
+
+        for (int clusterId = 0; clusterId < medoids.length; clusterId++) {
+            long pointsInThisCluster = frequencies[clusterId];
+            long medoid = medoids[clusterId];
+
+            for (int i = 0; i < pointsInThisCluster; i++) {
+                long zigzagResidual = residuals[residualReadPos++];
+                long residual = (zigzagResidual >>> 1) ^ -(zigzagResidual & 1);
+                long scaledDataPoint = medoid + residual + minVal;
+
+                if (dataType == TSDataType.FLOAT || dataType == 
TSDataType.DOUBLE) {
+                    doubleValues[dataWritePos++] = scaledDataPoint / 
scalingFactor;
+                } else {
+                    longValues[dataWritePos++] = scaledDataPoint;
+                }
+            }
+        }
+    }
+
+    private void reconstructFromMinValOnly(long minVal, int scalingExponent) {
+        if (dataType == TSDataType.FLOAT || dataType == TSDataType.DOUBLE) {
+            double scalingFactor = Math.pow(10, scalingExponent);
+            double finalValue = minVal / scalingFactor;
+            for(int i=0; i<this.count; i++) doubleValues[i] = finalValue;
+        } else {
+            for(int i=0; i<this.count; i++) longValues[i] = minVal;
+        }
+    }
+}
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterReader.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterReader.java
new file mode 100644
index 00000000..ad578ada
--- /dev/null
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterReader.java
@@ -0,0 +1,35 @@
+package org.apache.tsfile.encoding.decoder;
+
+import java.nio.ByteBuffer;
+
+public class ClusterReader {
+    private final ByteBuffer buffer;
+    private byte currentByte;
+    private int bitPosition; // from 7 down to 0
+
+    public ClusterReader(ByteBuffer buffer) {
+        this.buffer = buffer;
+        this.currentByte = 0;
+        this.bitPosition = -1; // Start at -1 to force reading a new byte first
+    }
+
+    public long read(int numBits) {
+        if (numBits > 64 || numBits <= 0) {
+            throw new IllegalArgumentException("Cannot read more than 64 bits 
or non-positive bits at once.");
+        }
+
+        long result = 0;
+        for (int i = 0; i < numBits; i++) {
+            if (bitPosition < 0) {
+                currentByte = buffer.get();
+                bitPosition = 7;
+            }
+            // Read the bit at the current position
+            long bit = (currentByte >> bitPosition) & 1;
+            // Shift the result and add the new bit
+            result = (result << 1) | bit;
+            bitPosition--;
+        }
+        return result;
+    }
+}
\ No newline at end of file
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
index 36c3d826..bc61bda0 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
@@ -177,7 +177,14 @@ public abstract class Decoder {
           default:
             throw new TsFileDecodingException(String.format(ERROR_MSG, 
encoding, dataType));
         }
-      default:
+      case ACLUSTER: case KCLUSTER:
+        switch (dataType){
+          case INT32: case INT64: case FLOAT: case DOUBLE:
+            return new ClusterDecoder(dataType);
+          default:
+            throw new TsFileDecodingException(String.format(ERROR_MSG, 
encoding, dataType));
+        }
+        default:
         throw new TsFileDecodingException(String.format(ERROR_MSG, encoding, 
dataType));
     }
   }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterAlgorithm.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterAlgorithm.java
new file mode 100644
index 00000000..44d4d91b
--- /dev/null
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterAlgorithm.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.encoding.encoder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public final class AClusterAlgorithm {
+
+    /**
+     * Private constructor to prevent instantiation of this utility class.
+     */
+    private AClusterAlgorithm() {}
+
+    /**
+     * The main entry point for the ACluster algorithm.
+     * It processes a page of data and returns the results as an Object array.
+     *
+     * @param data The input time series data for a single page, represented 
as a long array.
+     * @return An Object array where: <br>
+     *         - index 0: long[] medoids (sorted by cluster frequency) <br>
+     *         - index 1: int[] clusterAssignments (mapped to the sorted 
medoids) <br>
+     *         - index 2: long[] clusterFrequencies (sorted)
+     */
+    public static Object[] run(long[] data) {
+        int n = data.length;
+        if (n == 0) {
+            return new Object[]{new long[0], new int[0], new long[0]};
+        }
+
+        // --- Initialization ---
+        List<Long> medoids = new ArrayList<>();
+        Set<Long> existingMedoids = new HashSet<>();
+        List<Set<Integer>> pointsInClusters = new ArrayList<>();
+        int[] pointToMedoidMap = new int[n];
+
+        // --- Step 1: Initialize with the first data point ---
+        long firstPoint = data[0];
+        medoids.add(firstPoint);
+        existingMedoids.add(firstPoint);
+        Set<Integer> firstCluster = new HashSet<>();
+        firstCluster.add(0);
+        pointsInClusters.add(firstCluster);
+        pointToMedoidMap[0] = 0;
+
+        // --- Step 2: Iteratively process the rest of the points ---
+        for (int i = 1; i < n; i++) {
+            long currentPoint = data[i];
+
+            if (existingMedoids.contains(currentPoint)) {
+                int medoidIndex = medoids.indexOf(currentPoint);
+                pointsInClusters.get(medoidIndex).add(i);
+                pointToMedoidMap[i] = medoidIndex;
+                continue;
+            }
+
+            // --- Step 3: Find the best existing medoid ---
+            int bestMedoidIndex = -1;
+            long minCostToExistingMedoid = Long.MAX_VALUE;
+            for (int j = 0; j < medoids.size(); j++) {
+                long cost = calculateResidualCost(currentPoint, 
medoids.get(j));
+                if (cost < minCostToExistingMedoid) {
+                    minCostToExistingMedoid = cost;
+                    bestMedoidIndex = j;
+                }
+            }
+
+            // --- Step 4: Calculate potential savings ---
+            long savingsFromCurrentPoint = minCostToExistingMedoid;
+            long savingsFromReassignment = 0;
+            Set<Integer> pointsInBestCluster = 
pointsInClusters.get(bestMedoidIndex);
+            for (int pointIndexInCluster : pointsInBestCluster) {
+                long p = data[pointIndexInCluster];
+                long costToOldMedoid = calculateResidualCost(p, 
medoids.get(bestMedoidIndex));
+                long costToNewPotentialMedoid = calculateResidualCost(p, 
currentPoint);
+                if (costToNewPotentialMedoid < costToOldMedoid) {
+                    savingsFromReassignment += (costToOldMedoid - 
costToNewPotentialMedoid);
+                }
+            }
+            long totalSavings = savingsFromCurrentPoint + 
savingsFromReassignment;
+
+            // --- Step 5: Make the decision ---
+            long storageCostForNewPoint = 
calculateBasePointStorageCost(currentPoint);
+            if (totalSavings > storageCostForNewPoint) {
+                // Decision: Create a new medoid.
+                int newMedoidId = medoids.size();
+                medoids.add(currentPoint);
+                existingMedoids.add(currentPoint);
+                Set<Integer> newCluster = new HashSet<>();
+                newCluster.add(i);
+                pointsInClusters.add(newCluster);
+                pointToMedoidMap[i] = newMedoidId;
+
+                Set<Integer> pointsToReEvaluate = new 
HashSet<>(pointsInClusters.get(bestMedoidIndex));
+                for (int pointIndexToReEvaluate : pointsToReEvaluate) {
+                    long p = data[pointIndexToReEvaluate];
+                    if (calculateResidualCost(p, currentPoint) < 
calculateResidualCost(p, medoids.get(bestMedoidIndex))) {
+                        
pointsInClusters.get(bestMedoidIndex).remove(pointIndexToReEvaluate);
+                        
pointsInClusters.get(newMedoidId).add(pointIndexToReEvaluate);
+                        pointToMedoidMap[pointIndexToReEvaluate] = newMedoidId;
+                    }
+                }
+            } else {
+                // Decision: Assign to the existing best medoid.
+                pointsInClusters.get(bestMedoidIndex).add(i);
+                pointToMedoidMap[i] = bestMedoidIndex;
+            }
+        }
+
+        // --- Step 6: Finalize and sort the results ---
+        int k = medoids.size();
+        long[] finalMedoids = medoids.stream().mapToLong(l -> l).toArray();
+        long[] rawClusterSizes = new long[k];
+        for (int i = 0; i < k; i++) {
+            rawClusterSizes[i] = pointsInClusters.get(i).size();
+        }
+
+        return sortResults(finalMedoids, pointToMedoidMap, rawClusterSizes);
+    }
+
+    /**
+     * Helper class for sorting medoids based on their cluster size 
(frequency).
+     */
+    private static class MedoidSortHelper implements 
Comparable<MedoidSortHelper> {
+        long medoid;
+        long size;
+        int originalIndex;
+
+        MedoidSortHelper(long medoid, long size, int originalIndex) {
+            this.medoid = medoid;
+            this.size = size;
+            this.originalIndex = originalIndex;
+        }
+
+        @Override
+        public int compareTo(MedoidSortHelper other) {
+            return Long.compare(this.size, other.size);
+        }
+    }
+
+    /**
+     * Sorts the final medoids and their cluster information based on cluster 
frequency.
+     *
+     * @param medoids The discovered medoids.
+     * @param clusterAssignment The assignment map for each data point.
+     * @param clusterSize The frequency of each cluster.
+     * @return A sorted and correctly mapped Object array.
+     */
+    private static Object[] sortResults(long[] medoids, int[] 
clusterAssignment, long[] clusterSize) {
+        int k = medoids.length;
+        List<MedoidSortHelper> sorters = new ArrayList<>();
+        for (int i = 0; i < k; i++) {
+            sorters.add(new MedoidSortHelper(medoids[i], clusterSize[i], i));
+        }
+        Collections.sort(sorters);
+
+        long[] sortedMedoids = new long[k];
+        long[] sortedClusterSize = new long[k];
+        int[] oldToNewIndexMap = new int[k];
+
+        for (int i = 0; i < k; i++) {
+            MedoidSortHelper sortedItem = sorters.get(i);
+            sortedMedoids[i] = sortedItem.medoid;
+            sortedClusterSize[i] = sortedItem.size;
+            oldToNewIndexMap[sortedItem.originalIndex] = i;
+        }
+
+        int[] sortedClusterAssignment = new int[clusterAssignment.length];
+        for (int i = 0; i < clusterAssignment.length; i++) {
+            int oldIndex = clusterAssignment[i];
+            sortedClusterAssignment[i] = oldToNewIndexMap[oldIndex];
+        }
+
+        return new Object[]{sortedMedoids, sortedClusterAssignment, 
sortedClusterSize};
+    }
+
+    // --- Cost Calculation Functions ---
+
+    private static long bitLengthCost(long value) {
+        if (value == 0) return 1;
+        return 64 - Long.numberOfLeadingZeros(value);
+    }
+
+    private static long calculateResidualCost(long p1, long p2) {
+        return 1 + bitLengthCost(Math.abs(p1 - p2));
+    }
+
+    private static long calculateBasePointStorageCost(long basePoint) {
+        return 1 + bitLengthCost(Math.abs(basePoint));
+    }
+}
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterEncoder.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterEncoder.java
new file mode 100644
index 00000000..8e2ca0b7
--- /dev/null
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterEncoder.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.encoding.encoder;
+
+import org.apache.tsfile.exception.encoding.TsFileEncodingException;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Binary;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * AClusterEncoder is a batch-based compressor for numerical data (INT32, 
INT64, FLOAT, DOUBLE). It
+ * buffers all data points within a page, then on flush(), applies a 
clustering algorithm to find
+ * optimal global reference points and encodes residuals.
+ */
+public class AClusterEncoder extends Encoder {
+
+    private static final int DEFAULT_PACK_SIZE = 10;
+
+
+    /** A buffer to store all values of a page before flushing. We use long to 
unify all types. */
+    private interface ValueBuffer {
+        /** Adds a value to the buffer. */
+        void add(int value);
+        void add(long value);
+        void add(float value);
+        void add(double value);
+
+        /** Clears the internal buffer. */
+        void clear();
+
+        /** Checks if the buffer is empty. */
+        boolean isEmpty();
+
+        /** Gets the current size of the buffer. */
+        int size();
+
+        ProcessingResult processAndGet();
+    }
+
+    private static class Int64Buffer implements ValueBuffer {
+        private final List<Long> values = new ArrayList<>();
+
+        @Override
+        public void add(int value) { values.add((long) value); }
+        @Override
+        public void add(long value) { values.add(value); }
+        @Override
+        public void add(float value) { /* Do nothing, type mismatch */ }
+        @Override
+        public void add(double value) { /* Do nothing, type mismatch */ }
+
+        @Override
+        public ProcessingResult processAndGet() { // <--- 实现新方法
+            long[] data = values.stream().mapToLong(l -> l).toArray();
+            return new ProcessingResult(data, 0); // Exponent is 0 for integers
+        }
+
+        @Override
+        public void clear() { values.clear(); }
+        @Override
+        public boolean isEmpty() { return values.isEmpty(); }
+        @Override
+        public int size() { return values.size(); }
+    }
+
+    /** A buffer for FLOAT and DOUBLE types. It performs scaling in 
processAndGetLongs(). */
+    private static class DoubleBuffer implements ValueBuffer {
+        private final List<Double> values = new ArrayList<>();
+
+        @Override
+        public void add(int value) { /* Do nothing, type mismatch */ }
+        @Override
+        public void add(long value) { /* Do nothing, type mismatch */ }
+        @Override
+        public void add(float value) { values.add((double) value); } // Store 
as double to unify
+        @Override
+        public void add(double value) { values.add(value); }
+
+        @Override
+        public ProcessingResult processAndGet() {
+            // --- Edge Case: Handle empty buffer ---
+            if (values.isEmpty()) {
+                return new ProcessingResult(new long[0], 0);
+            }
+            int maxDecimalPlaces = 0;
+            for (double v : values) {
+                String s = BigDecimal.valueOf(v).toPlainString();
+                int dotIndex = s.indexOf('.');
+                if (dotIndex != -1) {
+                    int decimalPlaces = s.length() - dotIndex - 1;
+                    if (decimalPlaces > maxDecimalPlaces) {
+                        maxDecimalPlaces = decimalPlaces;
+                    }
+                }
+            }
+
+            double scalingFactor = Math.pow(10, maxDecimalPlaces);
+
+            long[] scaledLongs = new long[values.size()];
+            for (int i = 0; i < values.size(); i++) {
+                scaledLongs[i] = Math.round(values.get(i) * scalingFactor);
+            }
+
+            return new ProcessingResult(scaledLongs, maxDecimalPlaces);
+        }
+
+        @Override
+        public void clear() { values.clear(); }
+        @Override
+        public boolean isEmpty() { return values.isEmpty(); }
+        @Override
+        public int size() { return values.size(); }
+    }
+
+    private static class ProcessingResult {
+        final long[] scaledLongs;
+        final int scalingExponent; // e.g., 3 for a scaling factor of 1000
+
+        ProcessingResult(long[] scaledLongs, int scalingExponent) {
+            this.scaledLongs = scaledLongs;
+            this.scalingExponent = scalingExponent;
+        }
+
+        long[] getScaledLongs(){
+            return this.scaledLongs;
+        }
+
+        int getScalingExponent(){
+            return this.scalingExponent;
+        }
+    }
+
+
+    private final ValueBuffer buffer;
+
+    /**
+     * Constructor for AClusterEncoder. It's called by AClusterEncodingBuilder.
+     *
+     * @param dataType The data type of the time series, used for potential 
type-specific logic.
+     */
+    public AClusterEncoder(TSDataType dataType) {
+        super(TSEncoding.ACLUSTER);
+        switch (dataType) {
+            case INT32: case INT64:
+                this.buffer = new Int64Buffer();
+                break;
+            case FLOAT: case DOUBLE:
+                this.buffer = new DoubleBuffer();
+                break;
+            default:
+                throw new TsFileEncodingException("AClusterEncoder does not 
support data type: " + dataType);
+        }
+    }
+
+    @Override
+    public void encode(int value, ByteArrayOutputStream out) {
+        buffer.add(value);
+    }
+
+    @Override
+    public void encode(long value, ByteArrayOutputStream out) {
+        buffer.add(value);
+    }
+
+    @Override
+    public void encode(float value, ByteArrayOutputStream out) {
+        buffer.add(value);
+    }
+
+    @Override
+    public void encode(double value, ByteArrayOutputStream out) {
+        buffer.add(value);
+    }
+
+    @Override
+    public void flush(ByteArrayOutputStream out) throws IOException {
+        if (buffer.isEmpty()) {
+            return;
+        }
+
+        ProcessingResult procResult = buffer.processAndGet();
+        long[] originalData = procResult.getScaledLongs();
+        int scalingExponent = procResult.getScalingExponent();
+        if (originalData.length == 0) return;
+        long minVal = findMin(originalData);
+        long[] data = new long[originalData.length];
+        for (int i = 0; i < data.length; i++) {
+            data[i] = originalData[i] - minVal;
+        }
+
+        Object[] clusterResult = AClusterAlgorithm.run(data);
+        long[] sortedMedoids = (long[]) clusterResult[0];
+        int[] clusterAssignments = (int[]) clusterResult[1];
+        long[] clusterFrequencies = (long[]) clusterResult[2];
+
+        long[] sortedZigzagResiduals = calculateSortedZigzagResiduals(data, 
sortedMedoids, clusterAssignments, clusterFrequencies);
+
+        encodeResults(out, scalingExponent, minVal, sortedMedoids, 
clusterFrequencies, sortedZigzagResiduals);
+
+        buffer.clear();
+    }
+
+    private static class MedoidFreqPair {
+        long medoid;
+        long frequency;
+        int originalIndex;
+
+        MedoidFreqPair(long medoid, long frequency, int originalIndex) {
+            this.medoid = medoid;
+            this.frequency = frequency;
+            this.originalIndex = originalIndex;
+        }
+    }
+
+    /**
+     * A direct translation of your `residualCalculationZigzagNoHuff_sorted` 
logic.
+     * It computes residuals, applies zigzag encoding, and groups them by 
cluster.
+     */
+    private long[] calculateSortedZigzagResiduals(long[] data, long[] medoids, 
int[] assignments, long[] frequencies) {
+        int n = data.length;
+        int k = medoids.length;
+        if (n == 0) return new long[0];
+
+        long[] sortedResiduals = new long[n];
+        int[] writePointers = new int[k];
+        int cumulativeCount = 0;
+        for (int i = 0; i < k; i++) {
+            writePointers[i] = cumulativeCount;
+            cumulativeCount += (int) frequencies[i];
+        }
+
+        for (int i = 0; i < n; i++) {
+            int clusterId = assignments[i];
+            long medoid = medoids[clusterId];
+            long residual = data[i] - medoid;
+            long zigzagResidual = (residual << 1) ^ (residual >> 63); // 
Zigzag Encoding
+
+            int targetIndex = writePointers[clusterId];
+            sortedResiduals[targetIndex] = zigzagResidual;
+            writePointers[clusterId]++;
+        }
+        return sortedResiduals;
+    }
+
+    private void encodeResults(
+            ByteArrayOutputStream out,
+            int scalingExponent,
+            long minVal,
+            long[] medoids,
+            long[] frequencies,
+            long[] residuals)
+            throws IOException {
+
+        ClusterSupport writer = new ClusterSupport(out);
+        int numPoints = residuals.length;
+        int k = medoids.length;
+
+        writer.write(scalingExponent, 8);
+        writer.write(k, 16);
+        writer.write(numPoints, 16);
+        writer.write(DEFAULT_PACK_SIZE, 16);
+
+        int minValBit = ClusterSupport.bitsRequired(Math.abs(minVal));
+        writer.write(minValBit, 8);
+        writer.write(minVal>=0?0:1,1);
+        writer.write(minVal, minValBit);
+
+        if (k == 0) {
+            writer.flush();
+            return;
+        }
+
+        long minMedoid = findMin(medoids);
+        long[] medoidOffsets = new long[k];
+        int maxMedoidOffsetBits = 0;
+        for (int i = 0; i < k; i++) {
+            medoidOffsets[i] = medoids[i] - minMedoid;
+            maxMedoidOffsetBits = Math.max(maxMedoidOffsetBits, 
ClusterSupport.bitsRequired(medoidOffsets[i]));
+        }
+        int minMedoidBit = ClusterSupport.bitsRequired(Math.abs(minMedoid));
+        writer.write(minMedoidBit,8);
+        writer.write(minMedoid>0?0:1,1);
+        writer.write(minMedoid, minMedoidBit);
+
+        writer.write(maxMedoidOffsetBits, 8);
+        for (long offset : medoidOffsets) {
+            writer.write(offset, maxMedoidOffsetBits);
+        }
+
+        long[] freqDeltas = new long[k];
+        if (k > 0) {
+            freqDeltas[0] = frequencies[0];
+            for (int i = 1; i < k; i++) {
+                freqDeltas[i] = frequencies[i] - frequencies[i - 1];
+            }
+        }
+        int numFreqBlocks = (k + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
+        writer.write(numFreqBlocks, 16);
+
+        int[] freqBlockMaxBits = new int[numFreqBlocks];
+        // Metadata pass for frequencies
+        for (int i = 0; i < numFreqBlocks; i++) {
+            int start = i * DEFAULT_PACK_SIZE;
+            int end = Math.min(start + DEFAULT_PACK_SIZE, k);
+            long maxDelta = 0;
+            for (int j = start; j < end; j++) {
+                maxDelta = Math.max(maxDelta, freqDeltas[j]);
+            }
+            freqBlockMaxBits[i] = ClusterSupport.bitsRequired(maxDelta);
+            writer.write(freqBlockMaxBits[i], 8);
+        }
+        // Data pass for frequencies
+        for (int i = 0; i < numFreqBlocks; i++) {
+            int start = i * DEFAULT_PACK_SIZE;
+            int end = Math.min(start + DEFAULT_PACK_SIZE, k);
+            int bitsForBlock = freqBlockMaxBits[i];
+            for (int j = start; j < end; j++) {
+                writer.write(freqDeltas[j], bitsForBlock);
+            }
+        }
+
+        int numPacks = (numPoints + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
+        writer.write(numPacks, 32);
+
+        int[] resPackMaxBits = new int[numPacks];
+        // Metadata pass for residuals
+        for (int i = 0; i < numPacks; i++) {
+            int start = i * DEFAULT_PACK_SIZE;
+            int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
+            long maxOffset = 0;
+            for (int j = start; j < end; j++) {
+                maxOffset = Math.max(maxOffset, residuals[j]);
+            }
+            resPackMaxBits[i] = ClusterSupport.bitsRequired(maxOffset);
+            writer.write(resPackMaxBits[i], 8);
+        }
+        // Data pass for residuals
+        for (int i = 0; i < numPacks; i++) {
+            int start = i * DEFAULT_PACK_SIZE;
+            int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
+            int bitsForPack = resPackMaxBits[i];
+            if (bitsForPack > 0) {
+                for (int j = start; j < end; j++) {
+                    writer.write(residuals[j], bitsForPack);
+                }
+            }
+        }
+
+        writer.flush();
+    }
+
+    private long findMin(long[] data) {
+        if (data == null || data.length == 0) {
+            throw new IllegalArgumentException("Data array cannot be null or 
empty.");
+        }
+        long min = data[0];
+        for (int i = 1; i < data.length; i++) {
+            if (data[i] < min) {
+                min = data[i];
+            }
+        }
+        return min;
+    }
+
+    @Override
+    public int getOneItemMaxSize() {
+        return 8;
+    }
+
+    @Override
+    public long getMaxByteSize() {
+        if (this.buffer.isEmpty()) {
+            return 0;
+        }
+        return (long) this.buffer.size() * getOneItemMaxSize() * 3 / 2;
+    }
+
+    @Override
+    public void encode(boolean value, ByteArrayOutputStream out) {
+        throw new TsFileEncodingException("AClusterEncoder does not support 
boolean values.");
+    }
+
+    @Override
+    public void encode(short value, ByteArrayOutputStream out) {
+        throw new TsFileEncodingException("AClusterEncoder does not support 
short values.");
+    }
+
+    @Override
+    public void encode(Binary value, ByteArrayOutputStream out) {
+        throw new TsFileEncodingException("AClusterEncoder does not support 
Binary values.");
+    }
+
+    @Override
+    public void encode(BigDecimal value, ByteArrayOutputStream out) {
+        throw new TsFileEncodingException("AClusterEncoder does not support 
BigDecimal values.");
+    }
+}
+
+
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/ClusterSupport.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/ClusterSupport.java
new file mode 100644
index 00000000..6a189a90
--- /dev/null
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/ClusterSupport.java
@@ -0,0 +1,77 @@
+package org.apache.tsfile.encoding.encoder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class ClusterSupport {
+
+    private final ByteArrayOutputStream out;
+    private byte currentByte;
+    private int bitPosition; // 0-7, from left to right (MSB to LSB)
+
+    public ClusterSupport(ByteArrayOutputStream out) {
+        this.out = out;
+        this.currentByte = 0;
+        this.bitPosition = 0;
+    }
+
+    /**
+     * Writes a value using a specified number of bits.
+     *
+     * @param value The long value to write. Only the lower `numBits` will be 
used.
+     * @param numBits The number of bits to write for the value (must be > 0 
and <= 64).
+     * @throws IOException If an I/O error occurs.
+     */
+    public void write(long value, int numBits) throws IOException {
+        if (numBits <= 0 || numBits > 64) {
+            throw new IllegalArgumentException("Number of bits must be between 
1 and 64.");
+        }
+        for (int i = numBits - 1; i >= 0; i--) {
+            // Get the i-th bit from the value
+            boolean bit = ((value >> i) & 1) == 1;
+            writeBit(bit);
+        }
+    }
+
+    private void writeBit(boolean bit) throws IOException {
+        if (bit) {
+            currentByte |= (1 << (7 - bitPosition));
+        }
+        bitPosition++;
+        if (bitPosition == 8) {
+            out.write(currentByte);
+            currentByte = 0;
+            bitPosition = 0;
+        }
+    }
+
+    /**
+     * Flushes any remaining bits in the current byte to the output stream.
+     * This must be called at the end to ensure all data is written.
+     * @throws IOException If an I/O error occurs.
+     */
+    public void flush() throws IOException {
+        if (bitPosition > 0) {
+            out.write(currentByte);
+        }
+        // It's good practice to reset, though not strictly necessary if the 
instance is discarded.
+        currentByte = 0;
+        bitPosition = 0;
+    }
+
+    /**
+     * A helper to calculate the number of bits required for a non-negative 
long value.
+     *
+     * @param value The non-negative value.
+     * @return The number of bits required to represent it.
+     */
+    public static int bitsRequired(long value) {
+        if (value < 0) {
+            throw new IllegalArgumentException("Value must be non-negative.");
+        }
+        if (value == 0) {
+            return 1;
+        }
+        return 64 - Long.numberOfLeadingZeros(value);
+    }
+}
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterAlgorithm.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterAlgorithm.java
new file mode 100644
index 00000000..53016aa8
--- /dev/null
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterAlgorithm.java
@@ -0,0 +1,278 @@
+package org.apache.tsfile.encoding.encoder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
+
+public class KClusterAlgorithm {
+
+    private static final Random rand = new Random();
+
+    /**
+     * Private constructor to prevent instantiation.
+     */
+    private KClusterAlgorithm() {}
+
+    public static Object[] run(long[] data, int k) {
+        if (data == null || data.length == 0) {
+            return new Object[] {new long[0], new int[0], new long[0]};
+        }
+
+        long[][] data2D = new long[data.length][1];
+        for (int i = 0; i < data.length; i++) {
+            data2D[i][0] = data[i];
+        }
+
+        return kMedoidLogCost(data, k, 2, 0.01);
+    }
+
+    /**
+     * Helper class for sorting medoids based on their cluster size 
(frequency).
+     */
+    private static class MedoidSortHelper implements 
Comparable<KClusterAlgorithm.MedoidSortHelper> {
+        long medoid;
+        long size;
+        int originalIndex;
+
+        MedoidSortHelper(long medoid, long size, int originalIndex) {
+            this.medoid = medoid;
+            this.size = size;
+            this.originalIndex = originalIndex;
+        }
+
+        @Override
+        public int compareTo(KClusterAlgorithm.MedoidSortHelper other) {
+            return Long.compare(this.size, other.size);
+        }
+    }
+
+    /**
+     * The core K-Medoids algorithm, specifically implemented for 1D data.
+     */
+    private static Object[] kMedoidLogCost(long[] data, int k, int maxIter, 
double tol) {
+        int n = data.length;
+
+        // Use a HashSet for efficient uniqueness checking on primitive longs.
+        Set<Long> uniquePoints = new HashSet<>();
+        for (long point : data) {
+            uniquePoints.add(point);
+            if (uniquePoints.size() > k) {
+                break;
+            }
+        }
+        int distinctCount = uniquePoints.size();
+        if (distinctCount < k) {
+            System.err.println("Warning: Distinct data points (" + 
distinctCount +
+                    ") is less than input k (" + k + "), setting k to " + 
distinctCount);
+            k = distinctCount;
+        }
+
+        if (k <= 0) {
+            return new Object[]{new long[0], new int[n], new long[0]};
+        }
+
+        // 1. Initialize medoids using a K-Medoids++ style approach.
+        long[] medoids = acceleratedInitialization(data, k);
+        int[] clusterAssignment = new int[n];
+        long previousTotalCost = Long.MAX_VALUE;
+
+        // 2. Main iterative loop (Build and Swap phases).
+        for (int iteration = 0; iteration < maxIter; iteration++) {
+            // --- Assignment Step ---
+            long totalCostThisRound = 0L;
+            for (int i = 0; i < n; i++) {
+                long minCost = Long.MAX_VALUE;
+                int assignedMedoidIndex = -1;
+                for (int m = 0; m < k; m++) {
+                    long cost = calculateResidualCost(data[i], medoids[m]);
+                    if (cost < minCost) {
+                        minCost = cost;
+                        assignedMedoidIndex = m;
+                        if (minCost == 0) break; // Optimization: A perfect 
match is unbeatable.
+                    }
+                }
+                clusterAssignment[i] = assignedMedoidIndex;
+                totalCostThisRound += minCost;
+            }
+
+            // --- Convergence Check (Cost) ---
+            if (iteration > 0 && (previousTotalCost - totalCostThisRound) < 
tol) {
+                break;
+            }
+            previousTotalCost = totalCostThisRound;
+
+            // --- Update Step ---
+            long[] newMedoids = updateMedoids(data, clusterAssignment, k);
+
+            // --- Convergence Check (Medoids) ---
+            if (Arrays.equals(medoids, newMedoids)) {
+                break;
+            }
+            medoids = newMedoids;
+        }
+
+        // 3. Calculate final cluster sizes and sort results.
+        long[] finalClusterSizes = new long[k];
+        for (int assignment : clusterAssignment) {
+            if (assignment != -1) finalClusterSizes[assignment]++;
+        }
+        return sortResults(medoids, clusterAssignment, finalClusterSizes);
+    }
+
+    /**
+     * K-Medoids++ style initialization for 1D data.
+     */
+    private static long[] acceleratedInitialization(long[] data, int k) {
+        long[] medoids = new long[k];
+        Set<Long> selectedMedoids = new HashSet<>();
+
+        int firstIndex = rand.nextInt(data.length);
+        medoids[0] = data[firstIndex];
+        selectedMedoids.add(medoids[0]);
+
+        long[] distances = new long[data.length];
+        for (int i = 0; i < data.length; i++) {
+            distances[i] = Math.abs(data[i] - medoids[0]);
+        }
+
+        for (int i = 1; i < k; i++) {
+            long[] prefixSums = new long[data.length];
+            prefixSums[0] = distances[0];
+            for (int p = 1; p < data.length; p++) {
+                prefixSums[p] = prefixSums[p - 1] + distances[p];
+            }
+            long totalDistance = prefixSums[data.length - 1];
+            if (totalDistance == 0) {
+                int idx = rand.nextInt(data.length);
+                while (selectedMedoids.contains(data[idx])) {
+                    idx = (idx + 1) % data.length;
+                }
+                medoids[i] = data[idx];
+                selectedMedoids.add(medoids[i]);
+                continue;
+            }
+
+            long randValue = (long) (rand.nextDouble() * totalDistance);
+            int chosenIdx = binarySearch(prefixSums, randValue);
+
+            while (selectedMedoids.contains(data[chosenIdx])) {
+                chosenIdx = (chosenIdx + 1) % data.length;
+            }
+            medoids[i] = data[chosenIdx];
+            selectedMedoids.add(medoids[i]);
+
+            for (int idx = 0; idx < data.length; idx++) {
+                long distNewMedoid = Math.abs(data[idx] - medoids[i]);
+                if (distNewMedoid < distances[idx]) {
+                    distances[idx] = distNewMedoid;
+                }
+            }
+        }
+        return medoids;
+    }
+
+    /**
+     * Updates medoids by finding the point within each cluster that minimizes 
the total intra-cluster cost.
+     */
+    private static long[] updateMedoids(long[] data, int[] clusterAssignment, 
int k) {
+        long[] newMedoids = new long[k];
+        List<Long>[] clusterPoints = new ArrayList[k];
+        for (int i = 0; i < k; i++) {
+            clusterPoints[i] = new ArrayList<>();
+        }
+        for (int i = 0; i < data.length; i++) {
+            if (clusterAssignment[i] != -1) {
+                clusterPoints[clusterAssignment[i]].add(data[i]);
+            }
+        }
+
+        for (int m = 0; m < k; m++) {
+            List<Long> members = clusterPoints[m];
+            if (members.isEmpty()) continue;
+
+            long minTotalClusterCost = Long.MAX_VALUE;
+            long newMedoid = members.get(0);
+
+            for (Long candidate : members) {
+                long currentCandidateTotalCost = 0L;
+                for (Long otherMember : members) {
+                    currentCandidateTotalCost += 
calculateResidualCost(candidate, otherMember);
+                }
+                if (currentCandidateTotalCost < minTotalClusterCost) {
+                    minTotalClusterCost = currentCandidateTotalCost;
+                    newMedoid = candidate;
+                }
+            }
+            newMedoids[m] = newMedoid;
+        }
+        return newMedoids;
+    }
+
+    /**
+     * Sorts the final medoids and their cluster information based on cluster 
frequency.
+     *
+     * @param medoids The discovered medoids.
+     * @param clusterAssignment The assignment map for each data point.
+     * @param clusterSize The frequency of each cluster.
+     * @return A sorted and correctly mapped Object array.
+     */
+    private static Object[] sortResults(long[] medoids, int[] 
clusterAssignment, long[] clusterSize) {
+        int k = medoids.length;
+        List<KClusterAlgorithm.MedoidSortHelper> sorters = new ArrayList<>();
+        for (int i = 0; i < k; i++) {
+            sorters.add(new KClusterAlgorithm.MedoidSortHelper(medoids[i], 
clusterSize[i], i));
+        }
+        Collections.sort(sorters);
+
+        long[] sortedMedoids = new long[k];
+        long[] sortedClusterSize = new long[k];
+        int[] oldToNewIndexMap = new int[k];
+
+        for (int i = 0; i < k; i++) {
+            KClusterAlgorithm.MedoidSortHelper sortedItem = sorters.get(i);
+            sortedMedoids[i] = sortedItem.medoid;
+            sortedClusterSize[i] = sortedItem.size;
+            oldToNewIndexMap[sortedItem.originalIndex] = i;
+        }
+
+        int[] sortedClusterAssignment = new int[clusterAssignment.length];
+        for (int i = 0; i < clusterAssignment.length; i++) {
+            int oldIndex = clusterAssignment[i];
+            sortedClusterAssignment[i] = oldToNewIndexMap[oldIndex];
+        }
+
+        return new Object[]{sortedMedoids, sortedClusterAssignment, 
sortedClusterSize};
+    }
+
+    // --- Cost Calculation Functions ---
+
+    private static long bitLengthCost(long value) {
+        if (value == 0) return 1;
+        return 64 - Long.numberOfLeadingZeros(value);
+    }
+
+    private static long calculateResidualCost(long p1, long p2) {
+        return 1 + bitLengthCost(Math.abs(p1 - p2));
+    }
+
+    private static int binarySearch(long[] prefixSums, long value) {
+        int low = 0;
+        int high = prefixSums.length - 1;
+        int ans = -1;
+        while (low <= high) {
+            int mid = low + (high - low) / 2;
+            if (prefixSums[mid] >= value) {
+                ans = mid;
+                high = mid - 1;
+            } else {
+                low = mid + 1;
+            }
+        }
+        return ans == -1 ? prefixSums.length - 1 : ans;
+    }
+
+}
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterEncoder.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterEncoder.java
new file mode 100644
index 00000000..c24e90a5
--- /dev/null
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterEncoder.java
@@ -0,0 +1,381 @@
+package org.apache.tsfile.encoding.encoder;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.encoding.TsFileEncodingException;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Binary;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class KClusterEncoder extends Encoder{
+
+    private static final int DEFAULT_PACK_SIZE = 10;
+    private final int k;
+    private long[] scaledData;
+    private int scalingExponent;
+    private final ValueBuffer buffer;
+
+    // MODIFIED: Constructor takes 'k'
+    public KClusterEncoder(TSDataType dataType, int k) {
+        super(TSEncoding.KCLUSTER);
+        this.k = k;
+        switch (dataType) {
+            case INT32: case INT64:
+                this.buffer = new Int64Buffer();
+                break;
+            case FLOAT: case DOUBLE:
+                this.buffer = new DoubleBuffer();
+                break;
+            default:
+                throw new TsFileEncodingException("AClusterEncoder does not 
support data type: " + dataType);
+        }
+    }
+
+
+    /** A buffer to store all values of a page before flushing. We use long to 
unify all types. */
+    private interface ValueBuffer {
+        /** Adds a value to the buffer. */
+        void add(int value);
+        void add(long value);
+        void add(float value);
+        void add(double value);
+
+        /** Clears the internal buffer. */
+        void clear();
+
+        /** Checks if the buffer is empty. */
+        boolean isEmpty();
+
+        /** Gets the current size of the buffer. */
+        int size();
+
+        KClusterEncoder.ProcessingResult processAndGet();
+    }
+
+    private static class Int64Buffer implements KClusterEncoder.ValueBuffer {
+        private final List<Long> values = new ArrayList<>();
+
+        @Override
+        public void add(int value) { values.add((long) value); }
+        @Override
+        public void add(long value) { values.add(value); }
+        @Override
+        public void add(float value) { /* Do nothing, type mismatch */ }
+        @Override
+        public void add(double value) { /* Do nothing, type mismatch */ }
+
+        @Override
+        public KClusterEncoder.ProcessingResult processAndGet() { // <--- 实现新方法
+            long[] data = values.stream().mapToLong(l -> l).toArray();
+            return new KClusterEncoder.ProcessingResult(data, 0); // Exponent 
is 0 for integers
+        }
+
+        @Override
+        public void clear() { values.clear(); }
+        @Override
+        public boolean isEmpty() { return values.isEmpty(); }
+        @Override
+        public int size() { return values.size(); }
+    }
+
+    /** A buffer for FLOAT and DOUBLE types. It performs scaling in 
processAndGetLongs(). */
+    private static class DoubleBuffer implements KClusterEncoder.ValueBuffer {
+        private final List<Double> values = new ArrayList<>();
+
+        @Override
+        public void add(int value) { /* Do nothing, type mismatch */ }
+        @Override
+        public void add(long value) { /* Do nothing, type mismatch */ }
+        @Override
+        public void add(float value) { values.add((double) value); } // Store 
as double to unify
+        @Override
+        public void add(double value) { values.add(value); }
+
+        @Override
+        public KClusterEncoder.ProcessingResult processAndGet() {
+            // --- Edge Case: Handle empty buffer ---
+            if (values.isEmpty()) {
+                return new KClusterEncoder.ProcessingResult(new long[0], 0);
+            }
+            int maxDecimalPlaces = 0;
+            for (double v : values) {
+                String s = BigDecimal.valueOf(v).toPlainString();
+                int dotIndex = s.indexOf('.');
+                if (dotIndex != -1) {
+                    int decimalPlaces = s.length() - dotIndex - 1;
+                    if (decimalPlaces > maxDecimalPlaces) {
+                        maxDecimalPlaces = decimalPlaces;
+                    }
+                }
+            }
+
+            double scalingFactor = Math.pow(10, maxDecimalPlaces);
+
+            long[] scaledLongs = new long[values.size()];
+            for (int i = 0; i < values.size(); i++) {
+                scaledLongs[i] = Math.round(values.get(i) * scalingFactor);
+            }
+
+            return new KClusterEncoder.ProcessingResult(scaledLongs, 
maxDecimalPlaces);
+        }
+
+        @Override
+        public void clear() { values.clear(); }
+        @Override
+        public boolean isEmpty() { return values.isEmpty(); }
+        @Override
+        public int size() { return values.size(); }
+    }
+
+    private static class ProcessingResult {
+        final long[] scaledLongs;
+        final int scalingExponent; // e.g., 3 for a scaling factor of 1000
+
+        ProcessingResult(long[] scaledLongs, int scalingExponent) {
+            this.scaledLongs = scaledLongs;
+            this.scalingExponent = scalingExponent;
+        }
+
+        long[] getScaledLongs(){
+            return this.scaledLongs;
+        }
+
+        int getScalingExponent(){
+            return this.scalingExponent;
+        }
+    }
+
+
+    @Override
+    public void flush(ByteArrayOutputStream out) throws IOException {
+        if (buffer.isEmpty()) {
+            return;
+        }
+
+        ProcessingResult procResult = buffer.processAndGet();
+        long[] originalData = procResult.getScaledLongs();
+        int scalingExponent = procResult.getScalingExponent();
+        if (originalData.length == 0) return;
+        long minVal = findMin(originalData);
+        long[] data = new long[originalData.length];
+        for (int i = 0; i < data.length; i++) {
+            data[i] = originalData[i] - minVal;
+        }
+
+        Object[] clusterResult = KClusterAlgorithm.run(data,k);
+        long[] sortedMedoids = (long[]) clusterResult[0];
+        int[] clusterAssignments = (int[]) clusterResult[1];
+        long[] clusterFrequencies = (long[]) clusterResult[2];
+
+        long[] sortedZigzagResiduals = calculateSortedZigzagResiduals(data, 
sortedMedoids, clusterAssignments, clusterFrequencies);
+
+        encodeResults(out, scalingExponent, minVal, sortedMedoids, 
clusterAssignments, sortedZigzagResiduals);
+
+        buffer.clear();
+    }
+
+
+    @Override
+    public void encode(int value, ByteArrayOutputStream out) {
+        buffer.add(value);
+    }
+
+    @Override
+    public void encode(long value, ByteArrayOutputStream out) {
+        buffer.add(value);
+    }
+
+    @Override
+    public void encode(float value, ByteArrayOutputStream out) {
+        buffer.add(value);
+    }
+
+    @Override
+    public void encode(double value, ByteArrayOutputStream out) {
+        buffer.add(value);
+    }
+
+    private long[] calculateSortedZigzagResiduals(long[] data, long[] medoids, 
int[] assignments, long[] frequencies) {
+        int n = data.length;
+        int k = medoids.length;
+        if (n == 0) return new long[0];
+
+        long[] sortedResiduals = new long[n];
+        int[] writePointers = new int[k];
+        int cumulativeCount = 0;
+        for (int i = 0; i < k; i++) {
+            writePointers[i] = cumulativeCount;
+            cumulativeCount += (int) frequencies[i];
+        }
+
+        for (int i = 0; i < n; i++) {
+            int clusterId = assignments[i];
+            long medoid = medoids[clusterId];
+            long residual = data[i] - medoid;
+            long zigzagResidual = (residual << 1) ^ (residual >> 63); // 
Zigzag Encoding
+
+            int targetIndex = writePointers[clusterId];
+            sortedResiduals[targetIndex] = zigzagResidual;
+            writePointers[clusterId]++;
+        }
+        return sortedResiduals;
+    }
+
+    private void encodeResults(
+            ByteArrayOutputStream out,
+            int scalingExponent,
+            long minVal,
+            long[] medoids,
+            int[] frequencies,
+            long[] residuals)
+            throws IOException {
+
+        ClusterSupport writer = new ClusterSupport(out);
+        int numPoints = frequencies.length;
+        int k = medoids.length;
+
+        writer.write(scalingExponent, 8);
+        writer.write(k, 16);
+        writer.write(numPoints, 16);
+        writer.write(DEFAULT_PACK_SIZE, 16);
+
+        int minValBit = ClusterSupport.bitsRequired(minVal);
+        writer.write(minValBit, 8);
+        writer.write(minVal>=0?0:1,1);
+        writer.write(minVal, minValBit);
+
+        if (k == 0) {
+            writer.flush();
+            return;
+        }
+
+        long minMedoid = findMin(medoids);
+        long[] medoidOffsets = new long[k];
+        int maxMedoidOffsetBits = 0;
+        for (int i = 0; i < k; i++) {
+            medoidOffsets[i] = medoids[i] - minMedoid;
+            maxMedoidOffsetBits = Math.max(maxMedoidOffsetBits, 
ClusterSupport.bitsRequired(medoidOffsets[i]));
+        }
+        int minMedoidBit = ClusterSupport.bitsRequired(minMedoid);
+        writer.write(minMedoidBit,8);
+        writer.write(minMedoid>0?0:1,1);
+        writer.write(minMedoid, minMedoidBit);
+
+        writer.write(maxMedoidOffsetBits, 8);
+        for (long offset : medoidOffsets) {
+            writer.write(offset, maxMedoidOffsetBits);
+        }
+
+        long[] freqDeltas = new long[k];
+        if (k > 0) {
+            freqDeltas[0] = frequencies[0];
+            for (int i = 1; i < k; i++) {
+                freqDeltas[i] = frequencies[i] - frequencies[i - 1];
+            }
+        }
+        int numFreqBlocks = (k + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
+        writer.write(numFreqBlocks, 16);
+
+        int[] freqBlockMaxBits = new int[numFreqBlocks];
+        // Metadata pass for frequencies
+        for (int i = 0; i < numFreqBlocks; i++) {
+            int start = i * DEFAULT_PACK_SIZE;
+            int end = Math.min(start + DEFAULT_PACK_SIZE, k);
+            long maxDelta = 0;
+            for (int j = start; j < end; j++) {
+                maxDelta = Math.max(maxDelta, freqDeltas[j]);
+            }
+            freqBlockMaxBits[i] = ClusterSupport.bitsRequired(maxDelta);
+            writer.write(freqBlockMaxBits[i], 8);
+        }
+        // Data pass for frequencies
+        for (int i = 0; i < numFreqBlocks; i++) {
+            int start = i * DEFAULT_PACK_SIZE;
+            int end = Math.min(start + DEFAULT_PACK_SIZE, k);
+            int bitsForBlock = freqBlockMaxBits[i];
+            for (int j = start; j < end; j++) {
+                writer.write(freqDeltas[j], bitsForBlock);
+            }
+        }
+
+        int numPacks = (numPoints + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
+        writer.write(numPacks, 32);
+
+        int[] resPackMaxBits = new int[numPacks];
+        // Metadata pass for residuals
+        for (int i = 0; i < numPacks; i++) {
+            int start = i * DEFAULT_PACK_SIZE;
+            int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
+            long maxOffset = 0;
+            for (int j = start; j < end; j++) {
+                maxOffset = Math.max(maxOffset, residuals[j]);
+            }
+            resPackMaxBits[i] = ClusterSupport.bitsRequired(maxOffset);
+            writer.write(resPackMaxBits[i], 8);
+        }
+        // Data pass for residuals
+        for (int i = 0; i < numPacks; i++) {
+            int start = i * DEFAULT_PACK_SIZE;
+            int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
+            int bitsForPack = resPackMaxBits[i];
+            if (bitsForPack > 0) {
+                for (int j = start; j < end; j++) {
+                    writer.write(residuals[j], bitsForPack);
+                }
+            }
+        }
+
+        writer.flush();
+    }
+
+    private long findMin(long[] data) {
+        if (data == null || data.length == 0) {
+            throw new IllegalArgumentException("Data array cannot be null or 
empty.");
+        }
+        long min = data[0];
+        for (int i = 1; i < data.length; i++) {
+            if (data[i] < min) {
+                min = data[i];
+            }
+        }
+        return min;
+    }
+
+    @Override
+    public int getOneItemMaxSize() {
+        return 8;
+    }
+
+    @Override
+    public long getMaxByteSize() {
+        if (this.buffer.isEmpty()) {
+            return 0;
+        }
+        return (long) this.buffer.size() * getOneItemMaxSize() * 3 / 2;
+    }
+
+    @Override
+    public void encode(boolean value, ByteArrayOutputStream out) {
+        throw new TsFileEncodingException("AClusterEncoder does not support 
boolean values.");
+    }
+
+    @Override
+    public void encode(short value, ByteArrayOutputStream out) {
+        throw new TsFileEncodingException("AClusterEncoder does not support 
short values.");
+    }
+
+    @Override
+    public void encode(Binary value, ByteArrayOutputStream out) {
+        throw new TsFileEncodingException("AClusterEncoder does not support 
Binary values.");
+    }
+
+    @Override
+    public void encode(BigDecimal value, ByteArrayOutputStream out) {
+        throw new TsFileEncodingException("AClusterEncoder does not support 
BigDecimal values.");
+    }
+}
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
index 68c7e56b..4a25c88d 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
@@ -78,6 +78,10 @@ public abstract class TSEncodingBuilder {
         return new Sprintz();
       case RLBE:
         return new RLBE();
+      case KCLUSTER:
+        return new KCluster();
+      case ACLUSTER:
+        return new ACluster();
       default:
         throw new UnsupportedOperationException(type.toString());
     }
@@ -189,6 +193,79 @@ public abstract class TSEncodingBuilder {
     }
   }
 
+  public static class ACluster extends TSEncodingBuilder {
+
+    @Override
+    public Encoder getEncoder(TSDataType type) {
+      switch (type) {
+        case INT32:
+          return new AClusterEncoder(type);
+        case DATE:
+        case INT64:
+          return new AClusterEncoder(type);
+        case TIMESTAMP:
+        case FLOAT:
+          return new AClusterEncoder(type);
+        case DOUBLE:
+          return new AClusterEncoder(type);
+        default:
+          throw new UnSupportedDataTypeException("ACLUSTER doesn't support 
data type: " + type);
+      }
+    }
+
+    @Override
+    public void initFromProps(Map<String, String> props) {
+
+    }
+  }
+
+  public static class KCluster extends TSEncodingBuilder {
+
+    private int k;
+    private static final int KCLUSTER_DEFAULT_K = 1000;
+    private static final String K_KEY = "k";
+
+    public KCluster() {
+      this.k = KCLUSTER_DEFAULT_K;
+    }
+
+    @Override
+    public Encoder getEncoder(TSDataType type) {
+      switch (type) {
+        case INT32:
+          return new KClusterEncoder(type, this.k);
+        case DATE:
+        case INT64:
+          return new KClusterEncoder(type, this.k);
+        case TIMESTAMP:
+        case FLOAT:
+          return new KClusterEncoder(type, this.k);
+        case DOUBLE:
+          return new KClusterEncoder(type, this.k);
+        default:
+          throw new UnSupportedDataTypeException("KCLUSTER doesn't support 
data type: " + type);
+      }
+    }
+
+    @Override
+    public void initFromProps(Map<String, String> props) {
+      if (props != null && props.containsKey(K_KEY)) {
+        String kStr = props.get(K_KEY);
+        try {
+          int parsedK = Integer.parseInt(kStr);
+          if (parsedK <= 0) {
+            throw new IllegalArgumentException(
+                    "KCLUSTER parameter k must be a positive integer, but was 
" + parsedK);
+          }
+          this.k = parsedK;
+        } catch (NumberFormatException e) {
+          throw new IllegalArgumentException(
+                  "KCLUSTER parameter k must be an integer, but was " + kStr);
+        }
+      }
+    }
+  }
+
   /** for INT32, INT64, FLOAT, DOUBLE. */
   public static class Ts2Diff extends TSEncodingBuilder {
 
@@ -199,10 +276,9 @@ public abstract class TSEncodingBuilder {
       switch (type) {
         case INT32:
         case DATE:
-          return new DeltaBinaryEncoder.IntDeltaEncoder();
         case INT64:
         case TIMESTAMP:
-          return new DeltaBinaryEncoder.LongDeltaEncoder();
+          //          return new DeltaBinaryEncoder.LongDeltaEncoder();
         case FLOAT:
         case DOUBLE:
           return new FloatEncoder(TSEncoding.TS_2DIFF, type, maxPointNumber);
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/enums/TSEncoding.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/enums/TSEncoding.java
index 20374376..e436a562 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/enums/TSEncoding.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/enums/TSEncoding.java
@@ -41,7 +41,9 @@ public enum TSEncoding {
   FREQ((byte) 10),
   CHIMP((byte) 11),
   SPRINTZ((byte) 12),
-  RLBE((byte) 13);
+  RLBE((byte) 13),
+  KCLUSTER((byte) 14),
+  ACLUSTER((byte) 15);
   private final byte type;
 
   @SuppressWarnings("java:S2386") // used by other projects
@@ -63,6 +65,8 @@ public enum TSEncoding {
     intSet.add(TSEncoding.CHIMP);
     intSet.add(TSEncoding.SPRINTZ);
     intSet.add(TSEncoding.RLBE);
+    intSet.add(TSEncoding.KCLUSTER);
+    intSet.add(TSEncoding.ACLUSTER);
 
     TYPE_SUPPORTED_ENCODINGS.put(TSDataType.INT32, intSet);
     TYPE_SUPPORTED_ENCODINGS.put(TSDataType.INT64, intSet);
@@ -78,6 +82,8 @@ public enum TSEncoding {
     floatSet.add(TSEncoding.CHIMP);
     floatSet.add(TSEncoding.SPRINTZ);
     floatSet.add(TSEncoding.RLBE);
+    floatSet.add(TSEncoding.KCLUSTER);
+    floatSet.add(TSEncoding.ACLUSTER);
 
     TYPE_SUPPORTED_ENCODINGS.put(TSDataType.FLOAT, floatSet);
     TYPE_SUPPORTED_ENCODINGS.put(TSDataType.DOUBLE, floatSet);
@@ -135,6 +141,10 @@ public enum TSEncoding {
         return TSEncoding.SPRINTZ;
       case 13:
         return TSEncoding.RLBE;
+      case 14:
+        return TSEncoding.KCLUSTER;
+      case 15:
+        return TSEncoding.ACLUSTER;
       default:
         throw new IllegalArgumentException("Invalid input: " + encoding);
     }
diff --git 
a/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/AClusterEncoderDecoderTest.java
 
b/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/AClusterEncoderDecoderTest.java
new file mode 100644
index 00000000..fcc4dca1
--- /dev/null
+++ 
b/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/AClusterEncoderDecoderTest.java
@@ -0,0 +1,210 @@
+package org.apache.tsfile.encoding.decoder;
+
+import org.apache.tsfile.encoding.encoder.AClusterEncoder;
+import org.apache.tsfile.encoding.encoder.Encoder;
+import org.apache.tsfile.enums.TSDataType;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test suite for AClusterEncoder and AClusterDecoder.
+ * <p>
+ * This test validates the end-to-end encoding and decoding process.
+ * Since ACluster algorithm reorders data based on clusters, the validation
+ * cannot compare original and decoded lists element by element. Instead, it
+ * verifies that the set of unique values and their frequencies are identical
+ * before and after the process.
+ * </p>
+ * <p>
+ * The test structure is adapted to the iterator-style Decoder interface
+ * (hasNext(buffer), readXXX(buffer)).
+ * </p>
+ */
+public class AClusterEncoderDecoderTest {
+
+    private static final int ROW_NUM = 1000;
+    private final Random ran = new Random();
+
+    // 
=================================================================================
+    // Integer Tests
+    // 
=================================================================================
+
+    @Test
+    public void testIntBasicClusters() throws IOException {
+        List<Integer> data = new ArrayList<>();
+        // Three distinct clusters
+        for (int i = 0; i < 300; i++) data.add(100 + ran.nextInt(10)); // 
Cluster around 100
+        for (int i = 0; i < 400; i++) data.add(5000 + ran.nextInt(20)); // 
Cluster around 5000
+        for (int i = 0; i < 300; i++) data.add(100000 + ran.nextInt(5)); // 
Cluster around 100000
+        shouldReadAndWrite(data, TSDataType.INT32);
+    }
+
+
+    // 
=================================================================================
+    // Long Tests
+    // 
=================================================================================
+
+    @Test
+    public void testLongBasic() throws IOException {
+        List<Long> data = new ArrayList<>();
+        for (int i = 0; i < ROW_NUM; i++) {
+            data.add((long) i * i * i);
+        }
+        shouldReadAndWrite(data, TSDataType.INT64);
+    }
+
+    // 
=================================================================================
+    // Double Tests
+    // 
=================================================================================
+
+    @Test
+    public void testDoubleWithPrecision() throws IOException {
+        List<Double> data = new ArrayList<>();
+        final int precision = 6;
+
+        System.out.println("Testing double with controlled precision (max " + 
precision + " decimal places)...");
+
+        for (int i = 0; i < ROW_NUM / 2; i++) {
+            double randomPart = nextRandomDoubleWithPrecision(ran, precision);
+            double rawValue = 123.456 + randomPart;
+
+            double cleanValue = cleanDouble(rawValue, precision + 3);
+            data.add(cleanValue);
+        }
+
+        for (int i = 0; i < ROW_NUM / 2; i++) {
+            double randomPart = nextRandomDoubleWithPrecision(ran, precision);
+            double rawValue = 9999.0 + randomPart;
+
+            double cleanValue = cleanDouble(rawValue, precision);
+            data.add(cleanValue);
+        }
+
+        if (!data.isEmpty()) {
+            System.out.println("Sample generated data point (after cleaning): 
" + data.get(0));
+        }
+
+        shouldReadAndWrite(data, TSDataType.DOUBLE);
+    }
+
+    private double cleanDouble(double value, int maxPrecision) {
+        BigDecimal bd = new BigDecimal(value);
+        BigDecimal roundedBd = bd.setScale(maxPrecision, RoundingMode.HALF_UP);
+        return roundedBd.doubleValue();
+    }
+    // 
=================================================================================
+    // Edge Case Tests
+    // 
=================================================================================
+
+    @Test
+    public void testSingleValue() throws IOException {
+        shouldReadAndWrite(Arrays.asList(123.00000001,123.00000002, 
29.0001,29.0002,29.000001), TSDataType.DOUBLE);
+    }
+
+    @Test
+    public void testAllSameValues() throws IOException {
+        List<Integer> data = new ArrayList<>();
+        for(int i = 0; i < 100; i++) data.add(777);
+        shouldReadAndWrite(data, TSDataType.INT32);
+    }
+
+    // 
=================================================================================
+    // Core Test Logic and Helpers
+    // 
=================================================================================
+
+    private double nextRandomDoubleWithPrecision(Random random, int precision) 
{
+        if (precision < 0) {
+            throw new IllegalArgumentException("Precision must be 
non-negative.");
+        }
+        double factor = Math.pow(10, precision);
+
+        double scaled = random.nextDouble() * factor;
+        long rounded = Math.round(scaled);
+        return rounded / factor;
+    }
+
+    /**
+     * Generic helper to write a list of data using the appropriate encoder 
method.
+     */
+    private <T extends Number> void writeData(List<T> data, Encoder encoder, 
ByteArrayOutputStream out) throws IOException {
+        if (data.isEmpty()) {
+            return;
+        }
+        // Use instanceof to call the correct overloaded encode method
+        if (data.get(0) instanceof Integer) {
+            data.forEach(val -> encoder.encode((Integer) val, out));
+        } else if (data.get(0) instanceof Long) {
+            data.forEach(val -> encoder.encode((Long) val, out));
+        } else if (data.get(0) instanceof Float) {
+            data.forEach(val -> encoder.encode((Float) val, out));
+        } else if (data.get(0) instanceof Double) {
+            data.forEach(val -> encoder.encode((Double) val, out));
+        }
+        encoder.flush(out);
+    }
+
+    /**
+     * The main validation method. It encodes the given data, then decodes it,
+     * and finally compares the frequency maps of the original and decoded 
data.
+     */
+    private <T extends Number> void shouldReadAndWrite(List<T> originalData, 
TSDataType dataType) throws IOException {
+        // 1. Prepare for encoding
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        Encoder encoder = new AClusterEncoder(dataType);
+
+        // 2. Encode the data
+        writeData(originalData, encoder, out);
+        ByteBuffer buffer = ByteBuffer.wrap(out.toByteArray());
+
+        // 3. Decode the data using the iterator-style interface
+        Decoder decoder = new ClusterDecoder(dataType);
+        List<T> decodedData = new ArrayList<>();
+
+        while (decoder.hasNext(buffer)) {
+            switch (dataType) {
+                case INT32:
+                    decodedData.add((T) 
Integer.valueOf(decoder.readInt(buffer)));
+                    break;
+                case INT64:
+                    decodedData.add((T) 
Long.valueOf(decoder.readLong(buffer)));
+                    break;
+                case FLOAT:
+                    decodedData.add((T) 
Float.valueOf(decoder.readFloat(buffer)));
+                    break;
+                case DOUBLE:
+                    decodedData.add((T) 
Double.valueOf(decoder.readDouble(buffer)));
+                    break;
+                default:
+                    throw new UnsupportedOperationException("Unsupported data 
type for test");
+            }
+        }
+
+        // 4. Validate the results
+        // First, a quick check on the total count
+        assertEquals("Decoded data size should match original data size", 
originalData.size(), decodedData.size());
+
+        // Second, the robust check using frequency maps
+        Map<T, Long> originalFrequencies = getFrequencyMap(originalData);
+        Map<T, Long> decodedFrequencies = getFrequencyMap(decodedData);
+
+        assertEquals("Frequency maps of original and decoded data should be 
identical", originalFrequencies, decodedFrequencies);
+    }
+
+    /**
+     * Helper method to count frequencies of elements in a list.
+     */
+    private <T extends Number> Map<T, Long> getFrequencyMap(List<T> list) {
+        return list.stream()
+                .collect(Collectors.groupingBy(Function.identity(), 
Collectors.counting()));
+    }
+}
\ No newline at end of file

Reply via email to