This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch research/cluster-compress
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/research/cluster-compress by
this push:
new c1a1b0b7 Cluster Based Compression methods added (#576)
c1a1b0b7 is described below
commit c1a1b0b7acba8f67e0bbed1ef390085caea4d153
Author: SamM755 <[email protected]>
AuthorDate: Sat Aug 23 19:33:09 2025 +0800
Cluster Based Compression methods added (#576)
---
java/pom.xml | 4 +-
.../tsfile/encoding/decoder/ClusterDecoder.java | 215 +++++++++++
.../tsfile/encoding/decoder/ClusterReader.java | 35 ++
.../apache/tsfile/encoding/decoder/Decoder.java | 9 +-
.../tsfile/encoding/encoder/AClusterAlgorithm.java | 213 +++++++++++
.../tsfile/encoding/encoder/AClusterEncoder.java | 422 +++++++++++++++++++++
.../tsfile/encoding/encoder/ClusterSupport.java | 77 ++++
.../tsfile/encoding/encoder/KClusterAlgorithm.java | 278 ++++++++++++++
.../tsfile/encoding/encoder/KClusterEncoder.java | 381 +++++++++++++++++++
.../tsfile/encoding/encoder/TSEncodingBuilder.java | 80 +++-
.../tsfile/file/metadata/enums/TSEncoding.java | 12 +-
.../decoder/AClusterEncoderDecoderTest.java | 210 ++++++++++
12 files changed, 1930 insertions(+), 6 deletions(-)
diff --git a/java/pom.xml b/java/pom.xml
index 8c6be19b..b05a9207 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -56,7 +56,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
- <version>3.15.0</version>
+ <version>3.16.0</version>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
@@ -99,7 +99,7 @@
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
- <version>2.10.1</version>
+ <version>2.11.0</version>
</dependency>
</dependencies>
</dependencyManagement>
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterDecoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterDecoder.java
new file mode 100644
index 00000000..70479840
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterDecoder.java
@@ -0,0 +1,215 @@
+package org.apache.tsfile.encoding.decoder;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.encoding.TsFileDecodingException;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class ClusterDecoder extends Decoder {
+
+ private final TSDataType dataType;
+
+
+ private long[] longValues;
+ private double[] doubleValues;
+ private int readIndex = 0;
+ private int count = 0;
+ private boolean hasDecoded = false;
+
+ public ClusterDecoder(TSDataType dataType) {
+ super(TSEncoding.ACLUSTER);
+ this.dataType = dataType;
+ }
+
+ /**
+ * check has next
+ */
+ @Override
+ public boolean hasNext(ByteBuffer buffer) throws IOException {
+ if (!hasDecoded) {
+ decodeInternally(buffer);
+ }
+ return readIndex < count;
+ }
+
+ /**
+ * reset
+ */
+ @Override
+ public void reset() {
+ this.hasDecoded = false;
+ this.readIndex = 0;
+ this.count = 0;
+ this.longValues = null;
+ this.doubleValues = null;
+ }
+
+ @Override
+ public int readInt(ByteBuffer buffer){
+ return (int) longValues[readIndex++];
+ }
+
+ @Override
+ public long readLong(ByteBuffer buffer){
+ return longValues[readIndex++];
+ }
+
+ @Override
+ public float readFloat(ByteBuffer buffer){
+ return (float) doubleValues[readIndex++];
+ }
+
+ @Override
+ public double readDouble(ByteBuffer buffer){
+ return doubleValues[readIndex++];
+ }
+
+
+ /**
+ * Internal decode method
+ * Decode ACluster
+ */
+ private void decodeInternally(ByteBuffer buffer) {
+ if (hasDecoded) {
+ return;
+ }
+
+ ClusterReader reader = new ClusterReader(buffer);
+
+ // --- Header ---
+ int scalingExponent = (int) reader.read(8);
+ int k = (int) reader.read(16);
+ this.count = (int) reader.read(16);
+ int packSize = (int) reader.read(16);
+
+ // --- Global Minimum Value (minVal) ---
+ int minValBit = (int) reader.read(8);
+ long minValSign = reader.read(1);
+ long absMinVal = reader.read(minValBit); // Read the absolute value
part
+ long minVal = (minValSign == 1) ? -absMinVal : absMinVal; // Apply the
sign
+
+ // Allocate memory based on count and dataType
+ if (this.count > 0) {
+ if (dataType == TSDataType.FLOAT || dataType == TSDataType.DOUBLE)
{
+ this.doubleValues = new double[this.count];
+ } else {
+ this.longValues = new long[this.count];
+ }
+ }
+
+ // Handle case where all values are the same (k=0)
+ if (k == 0) {
+ if (this.count > 0) {
+ reconstructFromMinValOnly(minVal, scalingExponent);
+ }
+ this.hasDecoded = true;
+ return;
+ }
+
+ // --- Medoids ---
+ long[] medoids = new long[k];
+ int minMedoidBit = (int) reader.read(8);
+ long minMedoidSign = reader.read(1);
+ long absMinMedoid = reader.read(minMedoidBit); // Read the absolute
value part
+ long minMedoid = (minMedoidSign == 1) ? -absMinMedoid : absMinMedoid;
// Apply the sign
+
+ int maxMedoidOffsetBits = (int) reader.read(8);
+ for (int i = 0; i < k; i++) {
+ long offset = reader.read(maxMedoidOffsetBits);
+ medoids[i] = minMedoid + offset;
+ }
+
+ // --- Frequencies (Cluster Sizes) ---
+ // The encoder wrote deltas (cluster sizes), so we read them and
rebuild the cumulative array
+ long[] cumulativeFrequencies = new long[k];
+ int numFreqBlocks = (int) reader.read(16);
+
+ // Metadata pass for frequencies
+ int[] freqBlockMaxBits = new int[numFreqBlocks];
+ for (int i = 0; i < numFreqBlocks; i++) {
+ freqBlockMaxBits[i] = (int) reader.read(8);
+ }
+
+ // Data pass for frequencies - Reconstruct cumulative frequencies
+ long currentCumulativeFreq = 0;
+ int freqIndex = 0;
+ for (int i = 0; i < numFreqBlocks; i++) {
+ int start = i * packSize;
+ int end = Math.min(start + packSize, k);
+ int bitsForBlock = freqBlockMaxBits[i];
+ for (int j = start; j < end; j++) {
+ long delta = reader.read(bitsForBlock); // This delta is the
actual cluster size
+ currentCumulativeFreq += delta;
+ cumulativeFrequencies[freqIndex++] = currentCumulativeFreq;
+ }
+ }
+
+ // --- Residuals ---
+ long[] residuals = new long[this.count];
+ int numPacks = (int) reader.read(32);
+
+ // Metadata pass for residuals
+ int[] resPackMaxBits = new int[numPacks];
+ for (int i = 0; i < numPacks; i++) {
+ resPackMaxBits[i] = (int) reader.read(8);
+ }
+
+ // Data pass for residuals
+ int residualIdx = 0;
+ for (int i = 0; i < numPacks; i++) {
+ int start = i * packSize;
+ int end = Math.min(start + packSize, this.count);
+ int bitsForPack = resPackMaxBits[i];
+ if (bitsForPack > 0) {
+ for (int j = start; j < end; j++) {
+ residuals[residualIdx++] = reader.read(bitsForPack);
+ }
+ } else {
+ // If bitsForPack is 0, all residuals in this pack are 0.
+ // We just need to advance the index, as the array is already
initialized to 0.
+ residualIdx += (end - start);
+ }
+ }
+
+ // --- Final Data Reconstruction ---
+ // Use the correctly reconstructed cumulativeFrequencies array
+ reconstructData(medoids, cumulativeFrequencies, residuals, minVal,
scalingExponent);
+
+ this.hasDecoded = true;
+ }
+
+ private void reconstructData(long[] medoids, long[] frequencies, long[]
residuals, long minVal, int scalingExponent) {
+ int residualReadPos = 0;
+ int dataWritePos = 0;
+ double scalingFactor = Math.pow(10, scalingExponent);
+
+ for (int clusterId = 0; clusterId < medoids.length; clusterId++) {
+ long pointsInThisCluster = frequencies[clusterId];
+ long medoid = medoids[clusterId];
+
+ for (int i = 0; i < pointsInThisCluster; i++) {
+ long zigzagResidual = residuals[residualReadPos++];
+ long residual = (zigzagResidual >>> 1) ^ -(zigzagResidual & 1);
+ long scaledDataPoint = medoid + residual + minVal;
+
+ if (dataType == TSDataType.FLOAT || dataType ==
TSDataType.DOUBLE) {
+ doubleValues[dataWritePos++] = scaledDataPoint /
scalingFactor;
+ } else {
+ longValues[dataWritePos++] = scaledDataPoint;
+ }
+ }
+ }
+ }
+
+ private void reconstructFromMinValOnly(long minVal, int scalingExponent) {
+ if (dataType == TSDataType.FLOAT || dataType == TSDataType.DOUBLE) {
+ double scalingFactor = Math.pow(10, scalingExponent);
+ double finalValue = minVal / scalingFactor;
+ for(int i=0; i<this.count; i++) doubleValues[i] = finalValue;
+ } else {
+ for(int i=0; i<this.count; i++) longValues[i] = minVal;
+ }
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterReader.java
new file mode 100644
index 00000000..ad578ada
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/ClusterReader.java
@@ -0,0 +1,35 @@
+package org.apache.tsfile.encoding.decoder;
+
+import java.nio.ByteBuffer;
+
+public class ClusterReader {
+ private final ByteBuffer buffer;
+ private byte currentByte;
+ private int bitPosition; // from 7 down to 0
+
+ public ClusterReader(ByteBuffer buffer) {
+ this.buffer = buffer;
+ this.currentByte = 0;
+ this.bitPosition = -1; // Start at -1 to force reading a new byte first
+ }
+
+ public long read(int numBits) {
+ if (numBits > 64 || numBits <= 0) {
+ throw new IllegalArgumentException("Cannot read more than 64 bits
or non-positive bits at once.");
+ }
+
+ long result = 0;
+ for (int i = 0; i < numBits; i++) {
+ if (bitPosition < 0) {
+ currentByte = buffer.get();
+ bitPosition = 7;
+ }
+ // Read the bit at the current position
+ long bit = (currentByte >> bitPosition) & 1;
+ // Shift the result and add the new bit
+ result = (result << 1) | bit;
+ bitPosition--;
+ }
+ return result;
+ }
+}
\ No newline at end of file
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
index 36c3d826..bc61bda0 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
@@ -177,7 +177,14 @@ public abstract class Decoder {
default:
throw new TsFileDecodingException(String.format(ERROR_MSG,
encoding, dataType));
}
- default:
+ case ACLUSTER: case KCLUSTER:
+ switch (dataType){
+ case INT32: case INT64: case FLOAT: case DOUBLE:
+ return new ClusterDecoder(dataType);
+ default:
+ throw new TsFileDecodingException(String.format(ERROR_MSG,
encoding, dataType));
+ }
+ default:
throw new TsFileDecodingException(String.format(ERROR_MSG, encoding,
dataType));
}
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterAlgorithm.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterAlgorithm.java
new file mode 100644
index 00000000..44d4d91b
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterAlgorithm.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.encoding.encoder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public final class AClusterAlgorithm {
+
+ /**
+ * Private constructor to prevent instantiation of this utility class.
+ */
+ private AClusterAlgorithm() {}
+
+ /**
+ * The main entry point for the ACluster algorithm.
+ * It processes a page of data and returns the results as an Object array.
+ *
+ * @param data The input time series data for a single page, represented
as a long array.
+ * @return An Object array where: <br>
+ * - index 0: long[] medoids (sorted by cluster frequency) <br>
+ * - index 1: int[] clusterAssignments (mapped to the sorted
medoids) <br>
+ * - index 2: long[] clusterFrequencies (sorted)
+ */
+ public static Object[] run(long[] data) {
+ int n = data.length;
+ if (n == 0) {
+ return new Object[]{new long[0], new int[0], new long[0]};
+ }
+
+ // --- Initialization ---
+ List<Long> medoids = new ArrayList<>();
+ Set<Long> existingMedoids = new HashSet<>();
+ List<Set<Integer>> pointsInClusters = new ArrayList<>();
+ int[] pointToMedoidMap = new int[n];
+
+ // --- Step 1: Initialize with the first data point ---
+ long firstPoint = data[0];
+ medoids.add(firstPoint);
+ existingMedoids.add(firstPoint);
+ Set<Integer> firstCluster = new HashSet<>();
+ firstCluster.add(0);
+ pointsInClusters.add(firstCluster);
+ pointToMedoidMap[0] = 0;
+
+ // --- Step 2: Iteratively process the rest of the points ---
+ for (int i = 1; i < n; i++) {
+ long currentPoint = data[i];
+
+ if (existingMedoids.contains(currentPoint)) {
+ int medoidIndex = medoids.indexOf(currentPoint);
+ pointsInClusters.get(medoidIndex).add(i);
+ pointToMedoidMap[i] = medoidIndex;
+ continue;
+ }
+
+ // --- Step 3: Find the best existing medoid ---
+ int bestMedoidIndex = -1;
+ long minCostToExistingMedoid = Long.MAX_VALUE;
+ for (int j = 0; j < medoids.size(); j++) {
+ long cost = calculateResidualCost(currentPoint,
medoids.get(j));
+ if (cost < minCostToExistingMedoid) {
+ minCostToExistingMedoid = cost;
+ bestMedoidIndex = j;
+ }
+ }
+
+ // --- Step 4: Calculate potential savings ---
+ long savingsFromCurrentPoint = minCostToExistingMedoid;
+ long savingsFromReassignment = 0;
+ Set<Integer> pointsInBestCluster =
pointsInClusters.get(bestMedoidIndex);
+ for (int pointIndexInCluster : pointsInBestCluster) {
+ long p = data[pointIndexInCluster];
+ long costToOldMedoid = calculateResidualCost(p,
medoids.get(bestMedoidIndex));
+ long costToNewPotentialMedoid = calculateResidualCost(p,
currentPoint);
+ if (costToNewPotentialMedoid < costToOldMedoid) {
+ savingsFromReassignment += (costToOldMedoid -
costToNewPotentialMedoid);
+ }
+ }
+ long totalSavings = savingsFromCurrentPoint +
savingsFromReassignment;
+
+ // --- Step 5: Make the decision ---
+ long storageCostForNewPoint =
calculateBasePointStorageCost(currentPoint);
+ if (totalSavings > storageCostForNewPoint) {
+ // Decision: Create a new medoid.
+ int newMedoidId = medoids.size();
+ medoids.add(currentPoint);
+ existingMedoids.add(currentPoint);
+ Set<Integer> newCluster = new HashSet<>();
+ newCluster.add(i);
+ pointsInClusters.add(newCluster);
+ pointToMedoidMap[i] = newMedoidId;
+
+ Set<Integer> pointsToReEvaluate = new
HashSet<>(pointsInClusters.get(bestMedoidIndex));
+ for (int pointIndexToReEvaluate : pointsToReEvaluate) {
+ long p = data[pointIndexToReEvaluate];
+ if (calculateResidualCost(p, currentPoint) <
calculateResidualCost(p, medoids.get(bestMedoidIndex))) {
+
pointsInClusters.get(bestMedoidIndex).remove(pointIndexToReEvaluate);
+
pointsInClusters.get(newMedoidId).add(pointIndexToReEvaluate);
+ pointToMedoidMap[pointIndexToReEvaluate] = newMedoidId;
+ }
+ }
+ } else {
+ // Decision: Assign to the existing best medoid.
+ pointsInClusters.get(bestMedoidIndex).add(i);
+ pointToMedoidMap[i] = bestMedoidIndex;
+ }
+ }
+
+ // --- Step 6: Finalize and sort the results ---
+ int k = medoids.size();
+ long[] finalMedoids = medoids.stream().mapToLong(l -> l).toArray();
+ long[] rawClusterSizes = new long[k];
+ for (int i = 0; i < k; i++) {
+ rawClusterSizes[i] = pointsInClusters.get(i).size();
+ }
+
+ return sortResults(finalMedoids, pointToMedoidMap, rawClusterSizes);
+ }
+
+ /**
+ * Helper class for sorting medoids based on their cluster size
(frequency).
+ */
+ private static class MedoidSortHelper implements
Comparable<MedoidSortHelper> {
+ long medoid;
+ long size;
+ int originalIndex;
+
+ MedoidSortHelper(long medoid, long size, int originalIndex) {
+ this.medoid = medoid;
+ this.size = size;
+ this.originalIndex = originalIndex;
+ }
+
+ @Override
+ public int compareTo(MedoidSortHelper other) {
+ return Long.compare(this.size, other.size);
+ }
+ }
+
+ /**
+ * Sorts the final medoids and their cluster information based on cluster
frequency.
+ *
+ * @param medoids The discovered medoids.
+ * @param clusterAssignment The assignment map for each data point.
+ * @param clusterSize The frequency of each cluster.
+ * @return A sorted and correctly mapped Object array.
+ */
+ private static Object[] sortResults(long[] medoids, int[]
clusterAssignment, long[] clusterSize) {
+ int k = medoids.length;
+ List<MedoidSortHelper> sorters = new ArrayList<>();
+ for (int i = 0; i < k; i++) {
+ sorters.add(new MedoidSortHelper(medoids[i], clusterSize[i], i));
+ }
+ Collections.sort(sorters);
+
+ long[] sortedMedoids = new long[k];
+ long[] sortedClusterSize = new long[k];
+ int[] oldToNewIndexMap = new int[k];
+
+ for (int i = 0; i < k; i++) {
+ MedoidSortHelper sortedItem = sorters.get(i);
+ sortedMedoids[i] = sortedItem.medoid;
+ sortedClusterSize[i] = sortedItem.size;
+ oldToNewIndexMap[sortedItem.originalIndex] = i;
+ }
+
+ int[] sortedClusterAssignment = new int[clusterAssignment.length];
+ for (int i = 0; i < clusterAssignment.length; i++) {
+ int oldIndex = clusterAssignment[i];
+ sortedClusterAssignment[i] = oldToNewIndexMap[oldIndex];
+ }
+
+ return new Object[]{sortedMedoids, sortedClusterAssignment,
sortedClusterSize};
+ }
+
+ // --- Cost Calculation Functions ---
+
+ private static long bitLengthCost(long value) {
+ if (value == 0) return 1;
+ return 64 - Long.numberOfLeadingZeros(value);
+ }
+
+ private static long calculateResidualCost(long p1, long p2) {
+ return 1 + bitLengthCost(Math.abs(p1 - p2));
+ }
+
+ private static long calculateBasePointStorageCost(long basePoint) {
+ return 1 + bitLengthCost(Math.abs(basePoint));
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterEncoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterEncoder.java
new file mode 100644
index 00000000..8e2ca0b7
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/AClusterEncoder.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.encoding.encoder;
+
+import org.apache.tsfile.exception.encoding.TsFileEncodingException;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Binary;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * AClusterEncoder is a batch-based compressor for numerical data (INT32,
INT64, FLOAT, DOUBLE). It
+ * buffers all data points within a page, then on flush(), applies a
clustering algorithm to find
+ * optimal global reference points and encodes residuals.
+ */
+public class AClusterEncoder extends Encoder {
+
+ private static final int DEFAULT_PACK_SIZE = 10;
+
+
+ /** A buffer to store all values of a page before flushing. We use long to
unify all types. */
+ private interface ValueBuffer {
+ /** Adds a value to the buffer. */
+ void add(int value);
+ void add(long value);
+ void add(float value);
+ void add(double value);
+
+ /** Clears the internal buffer. */
+ void clear();
+
+ /** Checks if the buffer is empty. */
+ boolean isEmpty();
+
+ /** Gets the current size of the buffer. */
+ int size();
+
+ ProcessingResult processAndGet();
+ }
+
+ private static class Int64Buffer implements ValueBuffer {
+ private final List<Long> values = new ArrayList<>();
+
+ @Override
+ public void add(int value) { values.add((long) value); }
+ @Override
+ public void add(long value) { values.add(value); }
+ @Override
+ public void add(float value) { /* Do nothing, type mismatch */ }
+ @Override
+ public void add(double value) { /* Do nothing, type mismatch */ }
+
+ @Override
+ public ProcessingResult processAndGet() { // <--- 实现新方法
+ long[] data = values.stream().mapToLong(l -> l).toArray();
+ return new ProcessingResult(data, 0); // Exponent is 0 for integers
+ }
+
+ @Override
+ public void clear() { values.clear(); }
+ @Override
+ public boolean isEmpty() { return values.isEmpty(); }
+ @Override
+ public int size() { return values.size(); }
+ }
+
+ /** A buffer for FLOAT and DOUBLE types. It performs scaling in
processAndGetLongs(). */
+ private static class DoubleBuffer implements ValueBuffer {
+ private final List<Double> values = new ArrayList<>();
+
+ @Override
+ public void add(int value) { /* Do nothing, type mismatch */ }
+ @Override
+ public void add(long value) { /* Do nothing, type mismatch */ }
+ @Override
+ public void add(float value) { values.add((double) value); } // Store
as double to unify
+ @Override
+ public void add(double value) { values.add(value); }
+
+ @Override
+ public ProcessingResult processAndGet() {
+ // --- Edge Case: Handle empty buffer ---
+ if (values.isEmpty()) {
+ return new ProcessingResult(new long[0], 0);
+ }
+ int maxDecimalPlaces = 0;
+ for (double v : values) {
+ String s = BigDecimal.valueOf(v).toPlainString();
+ int dotIndex = s.indexOf('.');
+ if (dotIndex != -1) {
+ int decimalPlaces = s.length() - dotIndex - 1;
+ if (decimalPlaces > maxDecimalPlaces) {
+ maxDecimalPlaces = decimalPlaces;
+ }
+ }
+ }
+
+ double scalingFactor = Math.pow(10, maxDecimalPlaces);
+
+ long[] scaledLongs = new long[values.size()];
+ for (int i = 0; i < values.size(); i++) {
+ scaledLongs[i] = Math.round(values.get(i) * scalingFactor);
+ }
+
+ return new ProcessingResult(scaledLongs, maxDecimalPlaces);
+ }
+
+ @Override
+ public void clear() { values.clear(); }
+ @Override
+ public boolean isEmpty() { return values.isEmpty(); }
+ @Override
+ public int size() { return values.size(); }
+ }
+
+ private static class ProcessingResult {
+ final long[] scaledLongs;
+ final int scalingExponent; // e.g., 3 for a scaling factor of 1000
+
+ ProcessingResult(long[] scaledLongs, int scalingExponent) {
+ this.scaledLongs = scaledLongs;
+ this.scalingExponent = scalingExponent;
+ }
+
+ long[] getScaledLongs(){
+ return this.scaledLongs;
+ }
+
+ int getScalingExponent(){
+ return this.scalingExponent;
+ }
+ }
+
+
+ private final ValueBuffer buffer;
+
+ /**
+ * Constructor for AClusterEncoder. It's called by AClusterEncodingBuilder.
+ *
+ * @param dataType The data type of the time series, used for potential
type-specific logic.
+ */
+ public AClusterEncoder(TSDataType dataType) {
+ super(TSEncoding.ACLUSTER);
+ switch (dataType) {
+ case INT32: case INT64:
+ this.buffer = new Int64Buffer();
+ break;
+ case FLOAT: case DOUBLE:
+ this.buffer = new DoubleBuffer();
+ break;
+ default:
+ throw new TsFileEncodingException("AClusterEncoder does not
support data type: " + dataType);
+ }
+ }
+
+ @Override
+ public void encode(int value, ByteArrayOutputStream out) {
+ buffer.add(value);
+ }
+
+ @Override
+ public void encode(long value, ByteArrayOutputStream out) {
+ buffer.add(value);
+ }
+
+ @Override
+ public void encode(float value, ByteArrayOutputStream out) {
+ buffer.add(value);
+ }
+
+ @Override
+ public void encode(double value, ByteArrayOutputStream out) {
+ buffer.add(value);
+ }
+
+ @Override
+ public void flush(ByteArrayOutputStream out) throws IOException {
+ if (buffer.isEmpty()) {
+ return;
+ }
+
+ ProcessingResult procResult = buffer.processAndGet();
+ long[] originalData = procResult.getScaledLongs();
+ int scalingExponent = procResult.getScalingExponent();
+ if (originalData.length == 0) return;
+ long minVal = findMin(originalData);
+ long[] data = new long[originalData.length];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = originalData[i] - minVal;
+ }
+
+ Object[] clusterResult = AClusterAlgorithm.run(data);
+ long[] sortedMedoids = (long[]) clusterResult[0];
+ int[] clusterAssignments = (int[]) clusterResult[1];
+ long[] clusterFrequencies = (long[]) clusterResult[2];
+
+ long[] sortedZigzagResiduals = calculateSortedZigzagResiduals(data,
sortedMedoids, clusterAssignments, clusterFrequencies);
+
+ encodeResults(out, scalingExponent, minVal, sortedMedoids,
clusterFrequencies, sortedZigzagResiduals);
+
+ buffer.clear();
+ }
+
+ private static class MedoidFreqPair {
+ long medoid;
+ long frequency;
+ int originalIndex;
+
+ MedoidFreqPair(long medoid, long frequency, int originalIndex) {
+ this.medoid = medoid;
+ this.frequency = frequency;
+ this.originalIndex = originalIndex;
+ }
+ }
+
+ /**
+ * A direct translation of your `residualCalculationZigzagNoHuff_sorted`
logic.
+ * It computes residuals, applies zigzag encoding, and groups them by
cluster.
+ */
+ private long[] calculateSortedZigzagResiduals(long[] data, long[] medoids,
int[] assignments, long[] frequencies) {
+ int n = data.length;
+ int k = medoids.length;
+ if (n == 0) return new long[0];
+
+ long[] sortedResiduals = new long[n];
+ int[] writePointers = new int[k];
+ int cumulativeCount = 0;
+ for (int i = 0; i < k; i++) {
+ writePointers[i] = cumulativeCount;
+ cumulativeCount += (int) frequencies[i];
+ }
+
+ for (int i = 0; i < n; i++) {
+ int clusterId = assignments[i];
+ long medoid = medoids[clusterId];
+ long residual = data[i] - medoid;
+ long zigzagResidual = (residual << 1) ^ (residual >> 63); //
Zigzag Encoding
+
+ int targetIndex = writePointers[clusterId];
+ sortedResiduals[targetIndex] = zigzagResidual;
+ writePointers[clusterId]++;
+ }
+ return sortedResiduals;
+ }
+
+ private void encodeResults(
+ ByteArrayOutputStream out,
+ int scalingExponent,
+ long minVal,
+ long[] medoids,
+ long[] frequencies,
+ long[] residuals)
+ throws IOException {
+
+ ClusterSupport writer = new ClusterSupport(out);
+ int numPoints = residuals.length;
+ int k = medoids.length;
+
+ writer.write(scalingExponent, 8);
+ writer.write(k, 16);
+ writer.write(numPoints, 16);
+ writer.write(DEFAULT_PACK_SIZE, 16);
+
+ int minValBit = ClusterSupport.bitsRequired(Math.abs(minVal));
+ writer.write(minValBit, 8);
+ writer.write(minVal>=0?0:1,1);
+ writer.write(minVal, minValBit);
+
+ if (k == 0) {
+ writer.flush();
+ return;
+ }
+
+ long minMedoid = findMin(medoids);
+ long[] medoidOffsets = new long[k];
+ int maxMedoidOffsetBits = 0;
+ for (int i = 0; i < k; i++) {
+ medoidOffsets[i] = medoids[i] - minMedoid;
+ maxMedoidOffsetBits = Math.max(maxMedoidOffsetBits,
ClusterSupport.bitsRequired(medoidOffsets[i]));
+ }
+ int minMedoidBit = ClusterSupport.bitsRequired(Math.abs(minMedoid));
+ writer.write(minMedoidBit,8);
+ writer.write(minMedoid>0?0:1,1);
+ writer.write(minMedoid, minMedoidBit);
+
+ writer.write(maxMedoidOffsetBits, 8);
+ for (long offset : medoidOffsets) {
+ writer.write(offset, maxMedoidOffsetBits);
+ }
+
+ long[] freqDeltas = new long[k];
+ if (k > 0) {
+ freqDeltas[0] = frequencies[0];
+ for (int i = 1; i < k; i++) {
+ freqDeltas[i] = frequencies[i] - frequencies[i - 1];
+ }
+ }
+ int numFreqBlocks = (k + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
+ writer.write(numFreqBlocks, 16);
+
+ int[] freqBlockMaxBits = new int[numFreqBlocks];
+ // Metadata pass for frequencies
+ for (int i = 0; i < numFreqBlocks; i++) {
+ int start = i * DEFAULT_PACK_SIZE;
+ int end = Math.min(start + DEFAULT_PACK_SIZE, k);
+ long maxDelta = 0;
+ for (int j = start; j < end; j++) {
+ maxDelta = Math.max(maxDelta, freqDeltas[j]);
+ }
+ freqBlockMaxBits[i] = ClusterSupport.bitsRequired(maxDelta);
+ writer.write(freqBlockMaxBits[i], 8);
+ }
+ // Data pass for frequencies
+ for (int i = 0; i < numFreqBlocks; i++) {
+ int start = i * DEFAULT_PACK_SIZE;
+ int end = Math.min(start + DEFAULT_PACK_SIZE, k);
+ int bitsForBlock = freqBlockMaxBits[i];
+ for (int j = start; j < end; j++) {
+ writer.write(freqDeltas[j], bitsForBlock);
+ }
+ }
+
+ int numPacks = (numPoints + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
+ writer.write(numPacks, 32);
+
+ int[] resPackMaxBits = new int[numPacks];
+ // Metadata pass for residuals
+ for (int i = 0; i < numPacks; i++) {
+ int start = i * DEFAULT_PACK_SIZE;
+ int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
+ long maxOffset = 0;
+ for (int j = start; j < end; j++) {
+ maxOffset = Math.max(maxOffset, residuals[j]);
+ }
+ resPackMaxBits[i] = ClusterSupport.bitsRequired(maxOffset);
+ writer.write(resPackMaxBits[i], 8);
+ }
+ // Data pass for residuals
+ for (int i = 0; i < numPacks; i++) {
+ int start = i * DEFAULT_PACK_SIZE;
+ int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
+ int bitsForPack = resPackMaxBits[i];
+ if (bitsForPack > 0) {
+ for (int j = start; j < end; j++) {
+ writer.write(residuals[j], bitsForPack);
+ }
+ }
+ }
+
+ writer.flush();
+ }
+
+ private long findMin(long[] data) {
+ if (data == null || data.length == 0) {
+ throw new IllegalArgumentException("Data array cannot be null or
empty.");
+ }
+ long min = data[0];
+ for (int i = 1; i < data.length; i++) {
+ if (data[i] < min) {
+ min = data[i];
+ }
+ }
+ return min;
+ }
+
+ @Override
+ public int getOneItemMaxSize() {
+ return 8;
+ }
+
+ @Override
+ public long getMaxByteSize() {
+ if (this.buffer.isEmpty()) {
+ return 0;
+ }
+ return (long) this.buffer.size() * getOneItemMaxSize() * 3 / 2;
+ }
+
+ @Override
+ public void encode(boolean value, ByteArrayOutputStream out) {
+ throw new TsFileEncodingException("AClusterEncoder does not support
boolean values.");
+ }
+
+ @Override
+ public void encode(short value, ByteArrayOutputStream out) {
+ throw new TsFileEncodingException("AClusterEncoder does not support
short values.");
+ }
+
+ @Override
+ public void encode(Binary value, ByteArrayOutputStream out) {
+ throw new TsFileEncodingException("AClusterEncoder does not support
Binary values.");
+ }
+
+ @Override
+ public void encode(BigDecimal value, ByteArrayOutputStream out) {
+ throw new TsFileEncodingException("AClusterEncoder does not support
BigDecimal values.");
+ }
+}
+
+
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/ClusterSupport.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/ClusterSupport.java
new file mode 100644
index 00000000..6a189a90
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/ClusterSupport.java
@@ -0,0 +1,77 @@
+package org.apache.tsfile.encoding.encoder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class ClusterSupport {
+
+ private final ByteArrayOutputStream out;
+ private byte currentByte;
+ private int bitPosition; // 0-7, from left to right (MSB to LSB)
+
+ public ClusterSupport(ByteArrayOutputStream out) {
+ this.out = out;
+ this.currentByte = 0;
+ this.bitPosition = 0;
+ }
+
+ /**
+ * Writes a value using a specified number of bits.
+ *
+ * @param value The long value to write. Only the lower `numBits` will be
used.
+ * @param numBits The number of bits to write for the value (must be > 0
and <= 64).
+ * @throws IOException If an I/O error occurs.
+ */
+ public void write(long value, int numBits) throws IOException {
+ if (numBits <= 0 || numBits > 64) {
+ throw new IllegalArgumentException("Number of bits must be between
1 and 64.");
+ }
+ for (int i = numBits - 1; i >= 0; i--) {
+ // Get the i-th bit from the value
+ boolean bit = ((value >> i) & 1) == 1;
+ writeBit(bit);
+ }
+ }
+
+ private void writeBit(boolean bit) throws IOException {
+ if (bit) {
+ currentByte |= (1 << (7 - bitPosition));
+ }
+ bitPosition++;
+ if (bitPosition == 8) {
+ out.write(currentByte);
+ currentByte = 0;
+ bitPosition = 0;
+ }
+ }
+
+ /**
+ * Flushes any remaining bits in the current byte to the output stream.
+ * This must be called at the end to ensure all data is written.
+ * @throws IOException If an I/O error occurs.
+ */
+ public void flush() throws IOException {
+ if (bitPosition > 0) {
+ out.write(currentByte);
+ }
+ // It's good practice to reset, though not strictly necessary if the
instance is discarded.
+ currentByte = 0;
+ bitPosition = 0;
+ }
+
+ /**
+ * A helper to calculate the number of bits required for a non-negative
long value.
+ *
+ * @param value The non-negative value.
+ * @return The number of bits required to represent it.
+ */
+ public static int bitsRequired(long value) {
+ if (value < 0) {
+ throw new IllegalArgumentException("Value must be non-negative.");
+ }
+ if (value == 0) {
+ return 1;
+ }
+ return 64 - Long.numberOfLeadingZeros(value);
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterAlgorithm.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterAlgorithm.java
new file mode 100644
index 00000000..53016aa8
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterAlgorithm.java
@@ -0,0 +1,278 @@
+package org.apache.tsfile.encoding.encoder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
+
+public class KClusterAlgorithm {
+
+ private static final Random rand = new Random();
+
+ /**
+ * Private constructor to prevent instantiation.
+ */
+ private KClusterAlgorithm() {}
+
+ public static Object[] run(long[] data, int k) {
+ if (data == null || data.length == 0) {
+ return new Object[] {new long[0], new int[0], new long[0]};
+ }
+
+ long[][] data2D = new long[data.length][1];
+ for (int i = 0; i < data.length; i++) {
+ data2D[i][0] = data[i];
+ }
+
+ return kMedoidLogCost(data, k, 2, 0.01);
+ }
+
+ /**
+ * Helper class for sorting medoids based on their cluster size
(frequency).
+ */
+ private static class MedoidSortHelper implements
Comparable<KClusterAlgorithm.MedoidSortHelper> {
+ long medoid;
+ long size;
+ int originalIndex;
+
+ MedoidSortHelper(long medoid, long size, int originalIndex) {
+ this.medoid = medoid;
+ this.size = size;
+ this.originalIndex = originalIndex;
+ }
+
+ @Override
+ public int compareTo(KClusterAlgorithm.MedoidSortHelper other) {
+ return Long.compare(this.size, other.size);
+ }
+ }
+
+ /**
+ * The core K-Medoids algorithm, specifically implemented for 1D data.
+ */
+ private static Object[] kMedoidLogCost(long[] data, int k, int maxIter,
double tol) {
+ int n = data.length;
+
+ // Use a HashSet for efficient uniqueness checking on primitive longs.
+ Set<Long> uniquePoints = new HashSet<>();
+ for (long point : data) {
+ uniquePoints.add(point);
+ if (uniquePoints.size() > k) {
+ break;
+ }
+ }
+ int distinctCount = uniquePoints.size();
+ if (distinctCount < k) {
+ System.err.println("Warning: Distinct data points (" +
distinctCount +
+ ") is less than input k (" + k + "), setting k to " +
distinctCount);
+ k = distinctCount;
+ }
+
+ if (k <= 0) {
+ return new Object[]{new long[0], new int[n], new long[0]};
+ }
+
+ // 1. Initialize medoids using a K-Medoids++ style approach.
+ long[] medoids = acceleratedInitialization(data, k);
+ int[] clusterAssignment = new int[n];
+ long previousTotalCost = Long.MAX_VALUE;
+
+ // 2. Main iterative loop (Build and Swap phases).
+ for (int iteration = 0; iteration < maxIter; iteration++) {
+ // --- Assignment Step ---
+ long totalCostThisRound = 0L;
+ for (int i = 0; i < n; i++) {
+ long minCost = Long.MAX_VALUE;
+ int assignedMedoidIndex = -1;
+ for (int m = 0; m < k; m++) {
+ long cost = calculateResidualCost(data[i], medoids[m]);
+ if (cost < minCost) {
+ minCost = cost;
+ assignedMedoidIndex = m;
+ if (minCost == 0) break; // Optimization: A perfect
match is unbeatable.
+ }
+ }
+ clusterAssignment[i] = assignedMedoidIndex;
+ totalCostThisRound += minCost;
+ }
+
+ // --- Convergence Check (Cost) ---
+ if (iteration > 0 && (previousTotalCost - totalCostThisRound) <
tol) {
+ break;
+ }
+ previousTotalCost = totalCostThisRound;
+
+ // --- Update Step ---
+ long[] newMedoids = updateMedoids(data, clusterAssignment, k);
+
+ // --- Convergence Check (Medoids) ---
+ if (Arrays.equals(medoids, newMedoids)) {
+ break;
+ }
+ medoids = newMedoids;
+ }
+
+ // 3. Calculate final cluster sizes and sort results.
+ long[] finalClusterSizes = new long[k];
+ for (int assignment : clusterAssignment) {
+ if (assignment != -1) finalClusterSizes[assignment]++;
+ }
+ return sortResults(medoids, clusterAssignment, finalClusterSizes);
+ }
+
+ /**
+ * K-Medoids++ style initialization for 1D data.
+ */
+ private static long[] acceleratedInitialization(long[] data, int k) {
+ long[] medoids = new long[k];
+ Set<Long> selectedMedoids = new HashSet<>();
+
+ int firstIndex = rand.nextInt(data.length);
+ medoids[0] = data[firstIndex];
+ selectedMedoids.add(medoids[0]);
+
+ long[] distances = new long[data.length];
+ for (int i = 0; i < data.length; i++) {
+ distances[i] = Math.abs(data[i] - medoids[0]);
+ }
+
+ for (int i = 1; i < k; i++) {
+ long[] prefixSums = new long[data.length];
+ prefixSums[0] = distances[0];
+ for (int p = 1; p < data.length; p++) {
+ prefixSums[p] = prefixSums[p - 1] + distances[p];
+ }
+ long totalDistance = prefixSums[data.length - 1];
+ if (totalDistance == 0) {
+ int idx = rand.nextInt(data.length);
+ while (selectedMedoids.contains(data[idx])) {
+ idx = (idx + 1) % data.length;
+ }
+ medoids[i] = data[idx];
+ selectedMedoids.add(medoids[i]);
+ continue;
+ }
+
+ long randValue = (long) (rand.nextDouble() * totalDistance);
+ int chosenIdx = binarySearch(prefixSums, randValue);
+
+ while (selectedMedoids.contains(data[chosenIdx])) {
+ chosenIdx = (chosenIdx + 1) % data.length;
+ }
+ medoids[i] = data[chosenIdx];
+ selectedMedoids.add(medoids[i]);
+
+ for (int idx = 0; idx < data.length; idx++) {
+ long distNewMedoid = Math.abs(data[idx] - medoids[i]);
+ if (distNewMedoid < distances[idx]) {
+ distances[idx] = distNewMedoid;
+ }
+ }
+ }
+ return medoids;
+ }
+
+ /**
+ * Updates medoids by finding the point within each cluster that minimizes
the total intra-cluster cost.
+ */
+ private static long[] updateMedoids(long[] data, int[] clusterAssignment,
int k) {
+ long[] newMedoids = new long[k];
+ List<Long>[] clusterPoints = new ArrayList[k];
+ for (int i = 0; i < k; i++) {
+ clusterPoints[i] = new ArrayList<>();
+ }
+ for (int i = 0; i < data.length; i++) {
+ if (clusterAssignment[i] != -1) {
+ clusterPoints[clusterAssignment[i]].add(data[i]);
+ }
+ }
+
+ for (int m = 0; m < k; m++) {
+ List<Long> members = clusterPoints[m];
+ if (members.isEmpty()) continue;
+
+ long minTotalClusterCost = Long.MAX_VALUE;
+ long newMedoid = members.get(0);
+
+ for (Long candidate : members) {
+ long currentCandidateTotalCost = 0L;
+ for (Long otherMember : members) {
+ currentCandidateTotalCost +=
calculateResidualCost(candidate, otherMember);
+ }
+ if (currentCandidateTotalCost < minTotalClusterCost) {
+ minTotalClusterCost = currentCandidateTotalCost;
+ newMedoid = candidate;
+ }
+ }
+ newMedoids[m] = newMedoid;
+ }
+ return newMedoids;
+ }
+
+ /**
+ * Sorts the final medoids and their cluster information based on cluster
frequency.
+ *
+ * @param medoids The discovered medoids.
+ * @param clusterAssignment The assignment map for each data point.
+ * @param clusterSize The frequency of each cluster.
+ * @return A sorted and correctly mapped Object array.
+ */
+ private static Object[] sortResults(long[] medoids, int[]
clusterAssignment, long[] clusterSize) {
+ int k = medoids.length;
+ List<KClusterAlgorithm.MedoidSortHelper> sorters = new ArrayList<>();
+ for (int i = 0; i < k; i++) {
+ sorters.add(new KClusterAlgorithm.MedoidSortHelper(medoids[i],
clusterSize[i], i));
+ }
+ Collections.sort(sorters);
+
+ long[] sortedMedoids = new long[k];
+ long[] sortedClusterSize = new long[k];
+ int[] oldToNewIndexMap = new int[k];
+
+ for (int i = 0; i < k; i++) {
+ KClusterAlgorithm.MedoidSortHelper sortedItem = sorters.get(i);
+ sortedMedoids[i] = sortedItem.medoid;
+ sortedClusterSize[i] = sortedItem.size;
+ oldToNewIndexMap[sortedItem.originalIndex] = i;
+ }
+
+ int[] sortedClusterAssignment = new int[clusterAssignment.length];
+ for (int i = 0; i < clusterAssignment.length; i++) {
+ int oldIndex = clusterAssignment[i];
+ sortedClusterAssignment[i] = oldToNewIndexMap[oldIndex];
+ }
+
+ return new Object[]{sortedMedoids, sortedClusterAssignment,
sortedClusterSize};
+ }
+
+ // --- Cost Calculation Functions ---
+
+ private static long bitLengthCost(long value) {
+ if (value == 0) return 1;
+ return 64 - Long.numberOfLeadingZeros(value);
+ }
+
+ private static long calculateResidualCost(long p1, long p2) {
+ return 1 + bitLengthCost(Math.abs(p1 - p2));
+ }
+
+ private static int binarySearch(long[] prefixSums, long value) {
+ int low = 0;
+ int high = prefixSums.length - 1;
+ int ans = -1;
+ while (low <= high) {
+ int mid = low + (high - low) / 2;
+ if (prefixSums[mid] >= value) {
+ ans = mid;
+ high = mid - 1;
+ } else {
+ low = mid + 1;
+ }
+ }
+ return ans == -1 ? prefixSums.length - 1 : ans;
+ }
+
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterEncoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterEncoder.java
new file mode 100644
index 00000000..c24e90a5
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/KClusterEncoder.java
@@ -0,0 +1,381 @@
+package org.apache.tsfile.encoding.encoder;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.encoding.TsFileEncodingException;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Binary;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class KClusterEncoder extends Encoder{
+
+ private static final int DEFAULT_PACK_SIZE = 10;
+ private final int k;
+ private long[] scaledData;
+ private int scalingExponent;
+ private final ValueBuffer buffer;
+
+ // MODIFIED: Constructor takes 'k'
+ public KClusterEncoder(TSDataType dataType, int k) {
+ super(TSEncoding.KCLUSTER);
+ this.k = k;
+ switch (dataType) {
+ case INT32: case INT64:
+ this.buffer = new Int64Buffer();
+ break;
+ case FLOAT: case DOUBLE:
+ this.buffer = new DoubleBuffer();
+ break;
+ default:
+ throw new TsFileEncodingException("AClusterEncoder does not
support data type: " + dataType);
+ }
+ }
+
+
+ /** A buffer to store all values of a page before flushing. We use long to
unify all types. */
+ private interface ValueBuffer {
+ /** Adds a value to the buffer. */
+ void add(int value);
+ void add(long value);
+ void add(float value);
+ void add(double value);
+
+ /** Clears the internal buffer. */
+ void clear();
+
+ /** Checks if the buffer is empty. */
+ boolean isEmpty();
+
+ /** Gets the current size of the buffer. */
+ int size();
+
+ KClusterEncoder.ProcessingResult processAndGet();
+ }
+
+ private static class Int64Buffer implements KClusterEncoder.ValueBuffer {
+ private final List<Long> values = new ArrayList<>();
+
+ @Override
+ public void add(int value) { values.add((long) value); }
+ @Override
+ public void add(long value) { values.add(value); }
+ @Override
+ public void add(float value) { /* Do nothing, type mismatch */ }
+ @Override
+ public void add(double value) { /* Do nothing, type mismatch */ }
+
+ @Override
+ public KClusterEncoder.ProcessingResult processAndGet() { // <--- 实现新方法
+ long[] data = values.stream().mapToLong(l -> l).toArray();
+ return new KClusterEncoder.ProcessingResult(data, 0); // Exponent
is 0 for integers
+ }
+
+ @Override
+ public void clear() { values.clear(); }
+ @Override
+ public boolean isEmpty() { return values.isEmpty(); }
+ @Override
+ public int size() { return values.size(); }
+ }
+
+ /** A buffer for FLOAT and DOUBLE types. It performs scaling in
processAndGetLongs(). */
+ private static class DoubleBuffer implements KClusterEncoder.ValueBuffer {
+ private final List<Double> values = new ArrayList<>();
+
+ @Override
+ public void add(int value) { /* Do nothing, type mismatch */ }
+ @Override
+ public void add(long value) { /* Do nothing, type mismatch */ }
+ @Override
+ public void add(float value) { values.add((double) value); } // Store
as double to unify
+ @Override
+ public void add(double value) { values.add(value); }
+
+ @Override
+ public KClusterEncoder.ProcessingResult processAndGet() {
+ // --- Edge Case: Handle empty buffer ---
+ if (values.isEmpty()) {
+ return new KClusterEncoder.ProcessingResult(new long[0], 0);
+ }
+ int maxDecimalPlaces = 0;
+ for (double v : values) {
+ String s = BigDecimal.valueOf(v).toPlainString();
+ int dotIndex = s.indexOf('.');
+ if (dotIndex != -1) {
+ int decimalPlaces = s.length() - dotIndex - 1;
+ if (decimalPlaces > maxDecimalPlaces) {
+ maxDecimalPlaces = decimalPlaces;
+ }
+ }
+ }
+
+ double scalingFactor = Math.pow(10, maxDecimalPlaces);
+
+ long[] scaledLongs = new long[values.size()];
+ for (int i = 0; i < values.size(); i++) {
+ scaledLongs[i] = Math.round(values.get(i) * scalingFactor);
+ }
+
+ return new KClusterEncoder.ProcessingResult(scaledLongs,
maxDecimalPlaces);
+ }
+
+ @Override
+ public void clear() { values.clear(); }
+ @Override
+ public boolean isEmpty() { return values.isEmpty(); }
+ @Override
+ public int size() { return values.size(); }
+ }
+
+ private static class ProcessingResult {
+ final long[] scaledLongs;
+ final int scalingExponent; // e.g., 3 for a scaling factor of 1000
+
+ ProcessingResult(long[] scaledLongs, int scalingExponent) {
+ this.scaledLongs = scaledLongs;
+ this.scalingExponent = scalingExponent;
+ }
+
+ long[] getScaledLongs(){
+ return this.scaledLongs;
+ }
+
+ int getScalingExponent(){
+ return this.scalingExponent;
+ }
+ }
+
+
+ @Override
+ public void flush(ByteArrayOutputStream out) throws IOException {
+ if (buffer.isEmpty()) {
+ return;
+ }
+
+ ProcessingResult procResult = buffer.processAndGet();
+ long[] originalData = procResult.getScaledLongs();
+ int scalingExponent = procResult.getScalingExponent();
+ if (originalData.length == 0) return;
+ long minVal = findMin(originalData);
+ long[] data = new long[originalData.length];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = originalData[i] - minVal;
+ }
+
+ Object[] clusterResult = KClusterAlgorithm.run(data,k);
+ long[] sortedMedoids = (long[]) clusterResult[0];
+ int[] clusterAssignments = (int[]) clusterResult[1];
+ long[] clusterFrequencies = (long[]) clusterResult[2];
+
+ long[] sortedZigzagResiduals = calculateSortedZigzagResiduals(data,
sortedMedoids, clusterAssignments, clusterFrequencies);
+
+ encodeResults(out, scalingExponent, minVal, sortedMedoids,
clusterAssignments, sortedZigzagResiduals);
+
+ buffer.clear();
+ }
+
+
+ @Override
+ public void encode(int value, ByteArrayOutputStream out) {
+ buffer.add(value);
+ }
+
+ @Override
+ public void encode(long value, ByteArrayOutputStream out) {
+ buffer.add(value);
+ }
+
+ @Override
+ public void encode(float value, ByteArrayOutputStream out) {
+ buffer.add(value);
+ }
+
+ @Override
+ public void encode(double value, ByteArrayOutputStream out) {
+ buffer.add(value);
+ }
+
+ private long[] calculateSortedZigzagResiduals(long[] data, long[] medoids,
int[] assignments, long[] frequencies) {
+ int n = data.length;
+ int k = medoids.length;
+ if (n == 0) return new long[0];
+
+ long[] sortedResiduals = new long[n];
+ int[] writePointers = new int[k];
+ int cumulativeCount = 0;
+ for (int i = 0; i < k; i++) {
+ writePointers[i] = cumulativeCount;
+ cumulativeCount += (int) frequencies[i];
+ }
+
+ for (int i = 0; i < n; i++) {
+ int clusterId = assignments[i];
+ long medoid = medoids[clusterId];
+ long residual = data[i] - medoid;
+ long zigzagResidual = (residual << 1) ^ (residual >> 63); //
Zigzag Encoding
+
+ int targetIndex = writePointers[clusterId];
+ sortedResiduals[targetIndex] = zigzagResidual;
+ writePointers[clusterId]++;
+ }
+ return sortedResiduals;
+ }
+
+ private void encodeResults(
+ ByteArrayOutputStream out,
+ int scalingExponent,
+ long minVal,
+ long[] medoids,
+ int[] frequencies,
+ long[] residuals)
+ throws IOException {
+
+ ClusterSupport writer = new ClusterSupport(out);
+ int numPoints = frequencies.length;
+ int k = medoids.length;
+
+ writer.write(scalingExponent, 8);
+ writer.write(k, 16);
+ writer.write(numPoints, 16);
+ writer.write(DEFAULT_PACK_SIZE, 16);
+
+ int minValBit = ClusterSupport.bitsRequired(minVal);
+ writer.write(minValBit, 8);
+ writer.write(minVal>=0?0:1,1);
+ writer.write(minVal, minValBit);
+
+ if (k == 0) {
+ writer.flush();
+ return;
+ }
+
+ long minMedoid = findMin(medoids);
+ long[] medoidOffsets = new long[k];
+ int maxMedoidOffsetBits = 0;
+ for (int i = 0; i < k; i++) {
+ medoidOffsets[i] = medoids[i] - minMedoid;
+ maxMedoidOffsetBits = Math.max(maxMedoidOffsetBits,
ClusterSupport.bitsRequired(medoidOffsets[i]));
+ }
+ int minMedoidBit = ClusterSupport.bitsRequired(minMedoid);
+ writer.write(minMedoidBit,8);
+ writer.write(minMedoid>0?0:1,1);
+ writer.write(minMedoid, minMedoidBit);
+
+ writer.write(maxMedoidOffsetBits, 8);
+ for (long offset : medoidOffsets) {
+ writer.write(offset, maxMedoidOffsetBits);
+ }
+
+ long[] freqDeltas = new long[k];
+ if (k > 0) {
+ freqDeltas[0] = frequencies[0];
+ for (int i = 1; i < k; i++) {
+ freqDeltas[i] = frequencies[i] - frequencies[i - 1];
+ }
+ }
+ int numFreqBlocks = (k + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
+ writer.write(numFreqBlocks, 16);
+
+ int[] freqBlockMaxBits = new int[numFreqBlocks];
+ // Metadata pass for frequencies
+ for (int i = 0; i < numFreqBlocks; i++) {
+ int start = i * DEFAULT_PACK_SIZE;
+ int end = Math.min(start + DEFAULT_PACK_SIZE, k);
+ long maxDelta = 0;
+ for (int j = start; j < end; j++) {
+ maxDelta = Math.max(maxDelta, freqDeltas[j]);
+ }
+ freqBlockMaxBits[i] = ClusterSupport.bitsRequired(maxDelta);
+ writer.write(freqBlockMaxBits[i], 8);
+ }
+ // Data pass for frequencies
+ for (int i = 0; i < numFreqBlocks; i++) {
+ int start = i * DEFAULT_PACK_SIZE;
+ int end = Math.min(start + DEFAULT_PACK_SIZE, k);
+ int bitsForBlock = freqBlockMaxBits[i];
+ for (int j = start; j < end; j++) {
+ writer.write(freqDeltas[j], bitsForBlock);
+ }
+ }
+
+ int numPacks = (numPoints + DEFAULT_PACK_SIZE - 1) / DEFAULT_PACK_SIZE;
+ writer.write(numPacks, 32);
+
+ int[] resPackMaxBits = new int[numPacks];
+ // Metadata pass for residuals
+ for (int i = 0; i < numPacks; i++) {
+ int start = i * DEFAULT_PACK_SIZE;
+ int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
+ long maxOffset = 0;
+ for (int j = start; j < end; j++) {
+ maxOffset = Math.max(maxOffset, residuals[j]);
+ }
+ resPackMaxBits[i] = ClusterSupport.bitsRequired(maxOffset);
+ writer.write(resPackMaxBits[i], 8);
+ }
+ // Data pass for residuals
+ for (int i = 0; i < numPacks; i++) {
+ int start = i * DEFAULT_PACK_SIZE;
+ int end = Math.min(start + DEFAULT_PACK_SIZE, numPoints);
+ int bitsForPack = resPackMaxBits[i];
+ if (bitsForPack > 0) {
+ for (int j = start; j < end; j++) {
+ writer.write(residuals[j], bitsForPack);
+ }
+ }
+ }
+
+ writer.flush();
+ }
+
+ private long findMin(long[] data) {
+ if (data == null || data.length == 0) {
+ throw new IllegalArgumentException("Data array cannot be null or
empty.");
+ }
+ long min = data[0];
+ for (int i = 1; i < data.length; i++) {
+ if (data[i] < min) {
+ min = data[i];
+ }
+ }
+ return min;
+ }
+
+ @Override
+ public int getOneItemMaxSize() {
+ return 8;
+ }
+
+ @Override
+ public long getMaxByteSize() {
+ if (this.buffer.isEmpty()) {
+ return 0;
+ }
+ return (long) this.buffer.size() * getOneItemMaxSize() * 3 / 2;
+ }
+
+ @Override
+ public void encode(boolean value, ByteArrayOutputStream out) {
+ throw new TsFileEncodingException("AClusterEncoder does not support
boolean values.");
+ }
+
+ @Override
+ public void encode(short value, ByteArrayOutputStream out) {
+ throw new TsFileEncodingException("AClusterEncoder does not support
short values.");
+ }
+
+ @Override
+ public void encode(Binary value, ByteArrayOutputStream out) {
+ throw new TsFileEncodingException("AClusterEncoder does not support
Binary values.");
+ }
+
+ @Override
+ public void encode(BigDecimal value, ByteArrayOutputStream out) {
+ throw new TsFileEncodingException("AClusterEncoder does not support
BigDecimal values.");
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
index 68c7e56b..4a25c88d 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
@@ -78,6 +78,10 @@ public abstract class TSEncodingBuilder {
return new Sprintz();
case RLBE:
return new RLBE();
+ case KCLUSTER:
+ return new KCluster();
+ case ACLUSTER:
+ return new ACluster();
default:
throw new UnsupportedOperationException(type.toString());
}
@@ -189,6 +193,79 @@ public abstract class TSEncodingBuilder {
}
}
+ public static class ACluster extends TSEncodingBuilder {
+
+ @Override
+ public Encoder getEncoder(TSDataType type) {
+ switch (type) {
+ case INT32:
+ return new AClusterEncoder(type);
+ case DATE:
+ case INT64:
+ return new AClusterEncoder(type);
+ case TIMESTAMP:
+ case FLOAT:
+ return new AClusterEncoder(type);
+ case DOUBLE:
+ return new AClusterEncoder(type);
+ default:
+ throw new UnSupportedDataTypeException("ACLUSTER doesn't support
data type: " + type);
+ }
+ }
+
+ @Override
+ public void initFromProps(Map<String, String> props) {
+
+ }
+ }
+
+ public static class KCluster extends TSEncodingBuilder {
+
+ private int k;
+ private static final int KCLUSTER_DEFAULT_K = 1000;
+ private static final String K_KEY = "k";
+
+ public KCluster() {
+ this.k = KCLUSTER_DEFAULT_K;
+ }
+
+ @Override
+ public Encoder getEncoder(TSDataType type) {
+ switch (type) {
+ case INT32:
+ return new KClusterEncoder(type, this.k);
+ case DATE:
+ case INT64:
+ return new KClusterEncoder(type, this.k);
+ case TIMESTAMP:
+ case FLOAT:
+ return new KClusterEncoder(type, this.k);
+ case DOUBLE:
+ return new KClusterEncoder(type, this.k);
+ default:
+ throw new UnSupportedDataTypeException("KCLUSTER doesn't support
data type: " + type);
+ }
+ }
+
+ @Override
+ public void initFromProps(Map<String, String> props) {
+ if (props != null && props.containsKey(K_KEY)) {
+ String kStr = props.get(K_KEY);
+ try {
+ int parsedK = Integer.parseInt(kStr);
+ if (parsedK <= 0) {
+ throw new IllegalArgumentException(
+ "KCLUSTER parameter k must be a positive integer, but was
" + parsedK);
+ }
+ this.k = parsedK;
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "KCLUSTER parameter k must be an integer, but was " + kStr);
+ }
+ }
+ }
+ }
+
/** for INT32, INT64, FLOAT, DOUBLE. */
public static class Ts2Diff extends TSEncodingBuilder {
@@ -199,10 +276,9 @@ public abstract class TSEncodingBuilder {
switch (type) {
case INT32:
case DATE:
- return new DeltaBinaryEncoder.IntDeltaEncoder();
case INT64:
case TIMESTAMP:
- return new DeltaBinaryEncoder.LongDeltaEncoder();
+ // return new DeltaBinaryEncoder.LongDeltaEncoder();
case FLOAT:
case DOUBLE:
return new FloatEncoder(TSEncoding.TS_2DIFF, type, maxPointNumber);
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/enums/TSEncoding.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/enums/TSEncoding.java
index 20374376..e436a562 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/enums/TSEncoding.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/enums/TSEncoding.java
@@ -41,7 +41,9 @@ public enum TSEncoding {
FREQ((byte) 10),
CHIMP((byte) 11),
SPRINTZ((byte) 12),
- RLBE((byte) 13);
+ RLBE((byte) 13),
+ KCLUSTER((byte) 14),
+ ACLUSTER((byte) 15);
private final byte type;
@SuppressWarnings("java:S2386") // used by other projects
@@ -63,6 +65,8 @@ public enum TSEncoding {
intSet.add(TSEncoding.CHIMP);
intSet.add(TSEncoding.SPRINTZ);
intSet.add(TSEncoding.RLBE);
+ intSet.add(TSEncoding.KCLUSTER);
+ intSet.add(TSEncoding.ACLUSTER);
TYPE_SUPPORTED_ENCODINGS.put(TSDataType.INT32, intSet);
TYPE_SUPPORTED_ENCODINGS.put(TSDataType.INT64, intSet);
@@ -78,6 +82,8 @@ public enum TSEncoding {
floatSet.add(TSEncoding.CHIMP);
floatSet.add(TSEncoding.SPRINTZ);
floatSet.add(TSEncoding.RLBE);
+ floatSet.add(TSEncoding.KCLUSTER);
+ floatSet.add(TSEncoding.ACLUSTER);
TYPE_SUPPORTED_ENCODINGS.put(TSDataType.FLOAT, floatSet);
TYPE_SUPPORTED_ENCODINGS.put(TSDataType.DOUBLE, floatSet);
@@ -135,6 +141,10 @@ public enum TSEncoding {
return TSEncoding.SPRINTZ;
case 13:
return TSEncoding.RLBE;
+ case 14:
+ return TSEncoding.KCLUSTER;
+ case 15:
+ return TSEncoding.ACLUSTER;
default:
throw new IllegalArgumentException("Invalid input: " + encoding);
}
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/AClusterEncoderDecoderTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/AClusterEncoderDecoderTest.java
new file mode 100644
index 00000000..fcc4dca1
--- /dev/null
+++
b/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/AClusterEncoderDecoderTest.java
@@ -0,0 +1,210 @@
+package org.apache.tsfile.encoding.decoder;
+
+import org.apache.tsfile.encoding.encoder.AClusterEncoder;
+import org.apache.tsfile.encoding.encoder.Encoder;
+import org.apache.tsfile.enums.TSDataType;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test suite for AClusterEncoder and AClusterDecoder.
+ * <p>
+ * This test validates the end-to-end encoding and decoding process.
+ * Since ACluster algorithm reorders data based on clusters, the validation
+ * cannot compare original and decoded lists element by element. Instead, it
+ * verifies that the set of unique values and their frequencies are identical
+ * before and after the process.
+ * </p>
+ * <p>
+ * The test structure is adapted to the iterator-style Decoder interface
+ * (hasNext(buffer), readXXX(buffer)).
+ * </p>
+ */
+public class AClusterEncoderDecoderTest {
+
+ private static final int ROW_NUM = 1000;
+ private final Random ran = new Random();
+
+ //
=================================================================================
+ // Integer Tests
+ //
=================================================================================
+
+ @Test
+ public void testIntBasicClusters() throws IOException {
+ List<Integer> data = new ArrayList<>();
+ // Three distinct clusters
+ for (int i = 0; i < 300; i++) data.add(100 + ran.nextInt(10)); //
Cluster around 100
+ for (int i = 0; i < 400; i++) data.add(5000 + ran.nextInt(20)); //
Cluster around 5000
+ for (int i = 0; i < 300; i++) data.add(100000 + ran.nextInt(5)); //
Cluster around 100000
+ shouldReadAndWrite(data, TSDataType.INT32);
+ }
+
+
+ //
=================================================================================
+ // Long Tests
+ //
=================================================================================
+
+ @Test
+ public void testLongBasic() throws IOException {
+ List<Long> data = new ArrayList<>();
+ for (int i = 0; i < ROW_NUM; i++) {
+ data.add((long) i * i * i);
+ }
+ shouldReadAndWrite(data, TSDataType.INT64);
+ }
+
+ //
=================================================================================
+ // Double Tests
+ //
=================================================================================
+
+ @Test
+ public void testDoubleWithPrecision() throws IOException {
+ List<Double> data = new ArrayList<>();
+ final int precision = 6;
+
+ System.out.println("Testing double with controlled precision (max " +
precision + " decimal places)...");
+
+ for (int i = 0; i < ROW_NUM / 2; i++) {
+ double randomPart = nextRandomDoubleWithPrecision(ran, precision);
+ double rawValue = 123.456 + randomPart;
+
+ double cleanValue = cleanDouble(rawValue, precision + 3);
+ data.add(cleanValue);
+ }
+
+ for (int i = 0; i < ROW_NUM / 2; i++) {
+ double randomPart = nextRandomDoubleWithPrecision(ran, precision);
+ double rawValue = 9999.0 + randomPart;
+
+ double cleanValue = cleanDouble(rawValue, precision);
+ data.add(cleanValue);
+ }
+
+ if (!data.isEmpty()) {
+ System.out.println("Sample generated data point (after cleaning):
" + data.get(0));
+ }
+
+ shouldReadAndWrite(data, TSDataType.DOUBLE);
+ }
+
+ private double cleanDouble(double value, int maxPrecision) {
+ BigDecimal bd = new BigDecimal(value);
+ BigDecimal roundedBd = bd.setScale(maxPrecision, RoundingMode.HALF_UP);
+ return roundedBd.doubleValue();
+ }
+ //
=================================================================================
+ // Edge Case Tests
+ //
=================================================================================
+
+ @Test
+ public void testSingleValue() throws IOException {
+ shouldReadAndWrite(Arrays.asList(123.00000001,123.00000002,
29.0001,29.0002,29.000001), TSDataType.DOUBLE);
+ }
+
+ @Test
+ public void testAllSameValues() throws IOException {
+ List<Integer> data = new ArrayList<>();
+ for(int i = 0; i < 100; i++) data.add(777);
+ shouldReadAndWrite(data, TSDataType.INT32);
+ }
+
+ //
=================================================================================
+ // Core Test Logic and Helpers
+ //
=================================================================================
+
+ private double nextRandomDoubleWithPrecision(Random random, int precision)
{
+ if (precision < 0) {
+ throw new IllegalArgumentException("Precision must be
non-negative.");
+ }
+ double factor = Math.pow(10, precision);
+
+ double scaled = random.nextDouble() * factor;
+ long rounded = Math.round(scaled);
+ return rounded / factor;
+ }
+
+ /**
+ * Generic helper to write a list of data using the appropriate encoder
method.
+ */
+ private <T extends Number> void writeData(List<T> data, Encoder encoder,
ByteArrayOutputStream out) throws IOException {
+ if (data.isEmpty()) {
+ return;
+ }
+ // Use instanceof to call the correct overloaded encode method
+ if (data.get(0) instanceof Integer) {
+ data.forEach(val -> encoder.encode((Integer) val, out));
+ } else if (data.get(0) instanceof Long) {
+ data.forEach(val -> encoder.encode((Long) val, out));
+ } else if (data.get(0) instanceof Float) {
+ data.forEach(val -> encoder.encode((Float) val, out));
+ } else if (data.get(0) instanceof Double) {
+ data.forEach(val -> encoder.encode((Double) val, out));
+ }
+ encoder.flush(out);
+ }
+
+ /**
+ * The main validation method. It encodes the given data, then decodes it,
+ * and finally compares the frequency maps of the original and decoded
data.
+ */
+ private <T extends Number> void shouldReadAndWrite(List<T> originalData,
TSDataType dataType) throws IOException {
+ // 1. Prepare for encoding
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Encoder encoder = new AClusterEncoder(dataType);
+
+ // 2. Encode the data
+ writeData(originalData, encoder, out);
+ ByteBuffer buffer = ByteBuffer.wrap(out.toByteArray());
+
+ // 3. Decode the data using the iterator-style interface
+ Decoder decoder = new ClusterDecoder(dataType);
+ List<T> decodedData = new ArrayList<>();
+
+ while (decoder.hasNext(buffer)) {
+ switch (dataType) {
+ case INT32:
+ decodedData.add((T)
Integer.valueOf(decoder.readInt(buffer)));
+ break;
+ case INT64:
+ decodedData.add((T)
Long.valueOf(decoder.readLong(buffer)));
+ break;
+ case FLOAT:
+ decodedData.add((T)
Float.valueOf(decoder.readFloat(buffer)));
+ break;
+ case DOUBLE:
+ decodedData.add((T)
Double.valueOf(decoder.readDouble(buffer)));
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported data
type for test");
+ }
+ }
+
+ // 4. Validate the results
+ // First, a quick check on the total count
+ assertEquals("Decoded data size should match original data size",
originalData.size(), decodedData.size());
+
+ // Second, the robust check using frequency maps
+ Map<T, Long> originalFrequencies = getFrequencyMap(originalData);
+ Map<T, Long> decodedFrequencies = getFrequencyMap(decodedData);
+
+ assertEquals("Frequency maps of original and decoded data should be
identical", originalFrequencies, decodedFrequencies);
+ }
+
+ /**
+ * Helper method to count frequencies of elements in a list.
+ */
+ private <T extends Number> Map<T, Long> getFrequencyMap(List<T> list) {
+ return list.stream()
+ .collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
+ }
+}
\ No newline at end of file