This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch research/encoding-periodic
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/research/encoding-periodic by
this push:
new c5584be1 update FLEA #720
c5584be1 is described below
commit c5584be1977509495dff8c0a3542087e33e447ef
Author: Mr-Spade <[email protected]>
AuthorDate: Wed Feb 4 11:48:41 2026 +0800
update FLEA #720
---
java/tsfile/pom.xml | 2 +-
.../tsfile/encoding/decoder/FleaDecoder.java | 79 +++-
.../tsfile/encoding/decoder/LaminarDecoder.java | 14 +-
.../tsfile/encoding/encoder/FleaEncoder.java | 490 ++++++++++++++++-----
.../tsfile/encoding/encoder/LaminarEncoder.java | 6 +-
5 files changed, 455 insertions(+), 136 deletions(-)
diff --git a/java/tsfile/pom.xml b/java/tsfile/pom.xml
index 866472db..9204e326 100644
--- a/java/tsfile/pom.xml
+++ b/java/tsfile/pom.xml
@@ -88,7 +88,7 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.github.wendykierp</groupId>
+ <groupId>com.github.wendykierp</groupId>
<artifactId>JTransforms</artifactId>
<version>3.1</version>
</dependency>
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/FleaDecoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/FleaDecoder.java
index 8333f8d6..8ec2c00a 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/FleaDecoder.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/FleaDecoder.java
@@ -30,6 +30,38 @@ public class FleaDecoder extends Decoder {
private int numberRemainingInCurrentBlock = 0, totalInCurrentBlock = 0;
private long[] currentBlockValues = null;
+ private static final boolean DEBUG_TIMING =
Boolean.getBoolean("flea.debug.timing");
+ private static long totalLaminarRealNs = 0, totalLaminarImagNs = 0;
+ private static long totalIfftNs = 0, totalResidualNs = 0, totalCombineNs = 0;
+ private static int blockCount = 0;
+
+ public static void resetTiming() {
+ totalLaminarRealNs = totalLaminarImagNs = totalIfftNs = totalResidualNs =
totalCombineNs = 0;
+ blockCount = 0;
+ }
+
+ public static void printTiming() {
+ if (DEBUG_TIMING && blockCount > 0) {
+ double total = totalLaminarRealNs + totalLaminarImagNs + totalIfftNs +
totalResidualNs + totalCombineNs;
+ System.err.printf("[FLEA Decode Timing] blocks=%d\n", blockCount);
+ System.err.printf(
+ " LaminarReal: %8.3f ms (%5.1f%%)\n",
+ totalLaminarRealNs / 1e6, 100.0 * totalLaminarRealNs / total);
+ System.err.printf(
+ " LaminarImag: %8.3f ms (%5.1f%%)\n",
+ totalLaminarImagNs / 1e6, 100.0 * totalLaminarImagNs / total);
+ System.err.printf(
+ " IFFT: %8.3f ms (%5.1f%%)\n", totalIfftNs / 1e6, 100.0 *
totalIfftNs / total);
+ System.err.printf(
+ " Residual: %8.3f ms (%5.1f%%)\n",
+ totalResidualNs / 1e6, 100.0 * totalResidualNs / total);
+ System.err.printf(
+ " Combine: %8.3f ms (%5.1f%%)\n",
+ totalCombineNs / 1e6, 100.0 * totalCombineNs / total);
+ System.err.printf(" Total: %8.3f ms\n", total / 1e6);
+ }
+ }
+
public FleaDecoder() {
super(TSEncoding.FLEA);
}
@@ -38,35 +70,68 @@ public class FleaDecoder extends Decoder {
int n = ReadWriteIOUtils.readInt(buffer);
if (n > 0) {
+ int paddingAmount = buffer.get() & 0xFF;
+ int paddedN = n + paddingAmount;
+ int freqLen = paddedN / 2 + 1;
+
int beta = ReadWriteIOUtils.readInt(buffer);
+ long startNs, endNs;
+
+ startNs = DEBUG_TIMING ? System.nanoTime() : 0;
LaminarDecoder laminarDecoder = new LaminarDecoder();
- long[] quantizedReal = new long[n / 2 + 1], quantizedImag = new long[n /
2 + 1];
- for (int i = 0; i <= n / 2; i++) {
+ long[] quantizedReal = new long[freqLen], quantizedImag = new
long[freqLen];
+ for (int i = 0; i < freqLen; i++) {
quantizedReal[i] = laminarDecoder.readLong(buffer);
}
+ if (DEBUG_TIMING) {
+ endNs = System.nanoTime();
+ totalLaminarRealNs += (endNs - startNs);
+ }
+
+ startNs = DEBUG_TIMING ? System.nanoTime() : 0;
laminarDecoder = new LaminarDecoder();
- for (int i = 0; i <= n / 2; i++) {
+ for (int i = 0; i < freqLen; i++) {
quantizedImag[i] = laminarDecoder.readLong(buffer);
}
+ if (DEBUG_TIMING) {
+ endNs = System.nanoTime();
+ totalLaminarImagNs += (endNs - startNs);
+ }
- double[][] dequantized = new double[2][n / 2 + 1];
- for (int i = 0; i <= n / 2; i++) {
+ startNs = DEBUG_TIMING ? System.nanoTime() : 0;
+ double[][] dequantized = new double[2][freqLen];
+ for (int i = 0; i < freqLen; i++) {
dequantized[0][i] = FleaEncoder.dequantize(quantizedReal[i], beta);
dequantized[1][i] = FleaEncoder.dequantize(quantizedImag[i], beta);
}
- double[] reconstructed = FleaEncoder.inverseRealFFT(dequantized, n);
+ double[] reconstructedFull = FleaEncoder.inverseRealFFT(dequantized,
paddedN);
+ if (DEBUG_TIMING) {
+ endNs = System.nanoTime();
+ totalIfftNs += (endNs - startNs);
+ }
+ startNs = DEBUG_TIMING ? System.nanoTime() : 0;
long[] residuals = new long[n];
SeparateStorageDecoder separateStorageDecoder = new
SeparateStorageDecoder();
for (int i = 0; i < n; i++) {
residuals[i] = separateStorageDecoder.readLong(buffer);
}
+ if (DEBUG_TIMING) {
+ endNs = System.nanoTime();
+ totalResidualNs += (endNs - startNs);
+ }
+ startNs = DEBUG_TIMING ? System.nanoTime() : 0;
this.currentBlockValues = new long[n];
this.numberRemainingInCurrentBlock = this.totalInCurrentBlock = n;
for (int i = 0; i < n; i++) {
- this.currentBlockValues[i] = residuals[i] +
Math.round(reconstructed[i]);
+ this.currentBlockValues[i] = residuals[i] +
Math.round(reconstructedFull[i]);
+ }
+ if (DEBUG_TIMING) {
+ endNs = System.nanoTime();
+ totalCombineNs += (endNs - startNs);
+ blockCount++;
}
} else {
this.currentBlockValues = new long[0];
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/LaminarDecoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/LaminarDecoder.java
index 17233c52..35999c0b 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/LaminarDecoder.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/LaminarDecoder.java
@@ -54,7 +54,8 @@ public class LaminarDecoder extends Decoder {
}
int totalBits = 0;
- for (int width : laminarBitWidths) totalBits += width;
+ for (int width : laminarBitWidths)
+ totalBits += width;
int encodingLength = DescendingBitPackingDecoder.bitsToBytes(totalBits);
byte[] currentBuffer = new byte[encodingLength];
buffer.get(currentBuffer);
@@ -80,18 +81,17 @@ public class LaminarDecoder extends Decoder {
int p = ReadWriteIOUtils.readInt(buffer);
long[] denseValues = loadDecodeArray(buffer);
- for (int i = 0; i < p; i++) this.currentBlockValues[i] = denseValues[i];
+ for (int i = 0; i < p; i++)
+ this.currentBlockValues[i] = denseValues[i];
long[] sparseValues = loadDecodeArray(buffer);
int indexBitWidth = DescendingBitPackingDecoder.getValueWidth(n - 1);
- int encodingLength =
- DescendingBitPackingDecoder.bitsToBytes(indexBitWidth *
sparseValues.length);
+ int encodingLength =
DescendingBitPackingDecoder.bitsToBytes(indexBitWidth * sparseValues.length);
currentBuffer = new byte[encodingLength];
buffer.get(currentBuffer);
for (int i = 0; i < sparseValues.length; i++) {
- int currentIndex =
- Math.toIntExact(
- BytesUtils.bytesToLong(currentBuffer, indexBitWidth * i,
indexBitWidth));
+ int currentIndex = Math.toIntExact(
+ BytesUtils.bytesToLong(currentBuffer, indexBitWidth * i,
indexBitWidth));
this.currentBlockValues[currentIndex + p] = sparseValues[i];
}
} else {
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/FleaEncoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/FleaEncoder.java
index 086db35d..7f1afde3 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/FleaEncoder.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/FleaEncoder.java
@@ -34,50 +34,123 @@ public class FleaEncoder extends Encoder {
private List<Long> buffer = new ArrayList<>();
private int minBeta, maxBeta;
+ private static final double MAX_PADDING_RATIO = 0.10;
+
+ private static final int MIN_LENGTH_FOR_PADDING = 100;
+
+ private static final boolean USE_FFT_PADDING =
Boolean.parseBoolean(System.getProperty("flea.fft.padding", "true"));
+
public FleaEncoder() {
super(TSEncoding.FLEA);
this.minBeta = 11;
this.maxBeta = 20;
}
- private long[] laminarEstimateLength(long[] values) {
- int n = values.length;
- long[] result = new long[this.maxBeta - this.minBeta + 1];
+ private static boolean isFFTFriendly(int n) {
+ if (n <= 0)
+ return false;
+ while (n % 2 == 0)
+ n /= 2;
+ while (n % 3 == 0)
+ n /= 3;
+ while (n % 5 == 0)
+ n /= 5;
+ while (n % 7 == 0)
+ n /= 7;
+ return n == 1;
+ }
- if (n > 0) {
- int groupSize =
- 1 +
DescendingBitPackingEncoder.getValueWidth(TSFileConfig.RLE_MAX_REPEATED_NUM);
- int[] laminarBitWidths = LaminarEncoder.getLaminarBitWidths(values);
- List<Integer> repeatValues = new ArrayList<>(), repeatCounts = new
ArrayList<>();
- int currentValue = -1, currentCount = 0;
- for (int width : laminarBitWidths) {
- if (width == currentValue) {
- currentCount++;
- } else {
- if (currentValue != -1) {
- repeatValues.add(currentValue);
- repeatCounts.add(currentCount);
+ private static int nextFFTFriendly(int n) {
+ if (n <= 1)
+ return 1;
+ if (isFFTFriendly(n))
+ return n;
+
+ int best = n * 2;
+
+ for (int a = 1; a <= best; a *= 2) {
+ for (int b = a; b <= best; b *= 3) {
+ for (int c = b; c <= best; c *= 5) {
+ for (int d = c; d <= best; d *= 7) {
+ if (d >= n && d < best) {
+ best = d;
+ }
}
- currentValue = width;
- currentCount = 1;
}
}
- if (currentValue != -1) {
- repeatValues.add(currentValue);
- repeatCounts.add(currentCount);
+ }
+ return best;
+ }
+
+ private static int getFFTLength(int n) {
+ if (!USE_FFT_PADDING)
+ return n;
+ if (n < MIN_LENGTH_FOR_PADDING)
+ return n;
+
+ if (isFFTFriendly(n)) {
+ return n;
+ }
+
+ int padded = nextFFTFriendly(n);
+ double ratio = (double) padded / n;
+
+ if (ratio <= 1 + MAX_PADDING_RATIO) {
+ return padded;
+ }
+
+ return n;
+ }
+
+ @SuppressWarnings("unused")
+ private static int largestPrimeFactor(int n) {
+ if (n <= 1)
+ return n;
+ int largest = 1;
+ for (int d = 2; d * d <= n; d++) {
+ while (n % d == 0) {
+ largest = d;
+ n /= d;
+ }
+ }
+ if (n > 1)
+ largest = n;
+ return largest;
+ }
+
+ private static int[] getLaminarBitWidthsAbs(long[] values) {
+ int n = values.length;
+ int[] laminarBitWidths = new int[n];
+ for (int i = n - 1; i >= 0; i--) {
+ laminarBitWidths[i] =
DescendingBitPackingEncoder.getValueWidth(Math.abs(values[i]));
+ if (i < n - 1) {
+ laminarBitWidths[i] = Math.max(laminarBitWidths[i], laminarBitWidths[i
+ 1]);
}
+ }
+ return laminarBitWidths;
+ }
- for (int i = 0; i < repeatValues.size(); i++) {
- int width = repeatValues.get(i), count = repeatCounts.get(i);
- for (int beta = this.minBeta; beta <= this.maxBeta; beta++) {
- if (width > beta) {
- int groupCount =
- (1
- + (count + TSFileConfig.RLE_MAX_REPEATED_NUM - 1)
- / TSFileConfig.RLE_MAX_REPEATED_NUM);
- result[beta - this.minBeta] += groupCount * groupSize;
- }
+ private long[] laminarEstimateLength(int[] laminarBitWidths) {
+ int n = laminarBitWidths.length;
+ long[] result = new long[this.maxBeta - this.minBeta + 1];
+
+ if (n > 0) {
+ int groupSize = 1 +
DescendingBitPackingEncoder.getValueWidth(TSFileConfig.RLE_MAX_REPEATED_NUM);
+
+ int i = 0;
+ while (i < n) {
+ int width = laminarBitWidths[i];
+ int count = 1;
+ while (i + count < n && laminarBitWidths[i + count] == width) {
+ count++;
}
+ for (int beta = this.minBeta; beta < width && beta <= this.maxBeta;
beta++) {
+ int groupCount = 1
+ + (count + TSFileConfig.RLE_MAX_REPEATED_NUM - 1)
+ / TSFileConfig.RLE_MAX_REPEATED_NUM;
+ result[beta - this.minBeta] += groupCount * groupSize;
+ }
+ i += count;
}
}
return result;
@@ -92,12 +165,10 @@ public class FleaEncoder extends Encoder {
resultD2[1] += d * (-1 - (1 + groupBitWidth - minBeta +
indexBitWidth));
}
if (realBitWidth <= maxBeta) {
- resultD2[realBitWidth - minBeta] +=
- d * (1 - (1 + groupBitWidth - (realBitWidth - 1) + indexBitWidth));
+ resultD2[realBitWidth - minBeta] += d * (1 - (1 + groupBitWidth -
(realBitWidth - 1) + indexBitWidth));
}
if (realBitWidth + 1 <= maxBeta) {
- resultD2[realBitWidth + 1 - minBeta] +=
- d * (1 + groupBitWidth - (realBitWidth - 1) + indexBitWidth);
+ resultD2[realBitWidth + 1 - minBeta] += d * (1 + groupBitWidth -
(realBitWidth - 1) + indexBitWidth);
}
}
}
@@ -119,61 +190,86 @@ public class FleaEncoder extends Encoder {
}
}
- private long[] laminarCalculateResult(long[] resultD2) {
- long[] resultD1 = new long[this.maxBeta - this.minBeta + 1];
- long[] result = new long[this.maxBeta - this.minBeta + 1];
+ private void laminarCalculateResultInPlace(long[] resultD2, long[] resultD1,
long[] result) {
resultD1[0] = resultD2[0];
result[0] = resultD1[0];
for (int beta = this.minBeta + 1; beta <= this.maxBeta; beta++) {
- resultD1[beta - this.minBeta] =
- resultD1[beta - this.minBeta - 1] + resultD2[beta - this.minBeta];
- result[beta - this.minBeta] = result[beta - this.minBeta - 1] +
resultD1[beta - this.minBeta];
+ int idx = beta - this.minBeta;
+ resultD1[idx] = resultD1[idx - 1] + resultD2[idx];
+ result[idx] = result[idx - 1] + resultD1[idx];
}
- return result;
}
- private long[] laminarEstimateValue(long[] values) {
- int n = values.length;
-
- long[] resultD2 = new long[this.maxBeta - this.minBeta + 1];
- int[] bitWidths = new int[n];
- for (int i = 0; i < n; i++) {
- bitWidths[i] = DescendingBitPackingEncoder.getValueWidth(values[i]);
- }
- int[] laminarBitWidths = LaminarEncoder.getLaminarBitWidths(values);
+ private long[] laminarEstimateValue(int n, int[] bitWidths, int[]
laminarBitWidths) {
+ int betaRange = this.maxBeta - this.minBeta + 1;
+ long[] resultD2 = new long[betaRange];
int indexBitWidth = DescendingBitPackingEncoder.getValueWidth(n - 1);
for (int i = 0; i < n; i++) {
laminarAddSparseMode(resultD2, bitWidths[i], laminarBitWidths[i],
indexBitWidth, false);
}
- long[] result = new long[this.maxBeta - this.minBeta + 1];
+ long[] result = new long[betaRange];
for (int beta = this.minBeta; beta <= this.maxBeta; beta++) {
result[beta - this.minBeta] = Long.MAX_VALUE;
}
- int k = Math.min(n, 2 * (maxBeta - minBeta + 1));
+
+ long[] tempResultD1 = new long[betaRange];
+ long[] tempResult = new long[betaRange];
+
+ int k = Math.min(n, 2 * betaRange);
for (int i = 0; i < n; i++) {
if (i % k == 0) {
- long[] currentResult = laminarCalculateResult(resultD2);
+ laminarCalculateResultInPlace(resultD2, tempResultD1, tempResult);
for (int beta = this.minBeta; beta <= this.maxBeta; beta++) {
- result[beta - this.minBeta] =
- Math.min(result[beta - this.minBeta], currentResult[beta -
this.minBeta]);
+ int idx = beta - this.minBeta;
+ result[idx] = Math.min(result[idx], tempResult[idx]);
}
+ java.util.Arrays.fill(tempResultD1, 0);
}
laminarAddSparseMode(resultD2, bitWidths[i], laminarBitWidths[i],
indexBitWidth, true);
laminarAddDenseMode(resultD2, bitWidths[i], laminarBitWidths[i], false);
}
- long[] currentResult = laminarCalculateResult(resultD2);
+ laminarCalculateResultInPlace(resultD2, tempResultD1, tempResult);
for (int beta = this.minBeta; beta <= this.maxBeta; beta++) {
- result[beta - this.minBeta] =
- Math.min(result[beta - this.minBeta], currentResult[beta -
this.minBeta]);
+ int idx = beta - this.minBeta;
+ result[idx] = Math.min(result[idx], tempResult[idx]);
}
return result;
}
private long[] estimateFrequency(long[] values) {
- long[] result1 = laminarEstimateLength(values);
- long[] result2 = laminarEstimateValue(values);
+ int n = values.length;
+ int[] bitWidths = new int[n];
+ for (int i = 0; i < n; i++) {
+ bitWidths[i] =
DescendingBitPackingEncoder.getValueWidth(Math.abs(values[i]));
+ }
+ int[] laminarBitWidths = new int[n];
+ if (n > 0) {
+ laminarBitWidths[n - 1] = bitWidths[n - 1];
+ for (int i = n - 2; i >= 0; i--) {
+ laminarBitWidths[i] = Math.max(bitWidths[i], laminarBitWidths[i + 1]);
+ }
+ }
+
+ long[] result1 = laminarEstimateLength(laminarBitWidths);
+ long[] result2 = laminarEstimateValue(n, bitWidths, laminarBitWidths);
+ if (DEBUG) {
+ StringBuilder sb1 = new StringBuilder("[");
+ StringBuilder sb2 = new StringBuilder("[");
+ for (int i = 0; i < result1.length; i++) {
+ if (i > 0) {
+ sb1.append(", ");
+ sb2.append(", ");
+ }
+ sb1.append(result1[i]);
+ sb2.append(result2[i]);
+ }
+ sb1.append("]");
+ sb2.append("]");
+ System.out.println(
+ "[FLEA EST] n=" + values.length + " est_length=" + sb1 + "
est_value=" + sb2);
+ }
long[] result = new long[this.maxBeta - this.minBeta + 1];
for (int beta = this.minBeta; beta <= this.maxBeta; beta++) {
result[beta - this.minBeta] = result1[beta - this.minBeta] +
result2[beta - this.minBeta];
@@ -184,9 +280,10 @@ public class FleaEncoder extends Encoder {
private long[] estimateResidual(int n, long[] frequencyReal, long[]
frequencyImag) {
double[] squareSumDiff = new double[maxBeta - minBeta + 1];
long[] partialCountDiff = new long[maxBeta - minBeta + 1];
+ int freqLen = frequencyReal.length;
- for (int i = 0; i < n; i++) {
- long real = i < frequencyReal.length ? frequencyReal[i] :
frequencyReal[n - i];
+ for (int i = 0; i < freqLen; i++) {
+ long real = frequencyReal[i];
int realBitLength =
DescendingBitPackingEncoder.getValueWidth(Math.abs(real));
if (realBitLength - 1 >= minBeta) {
partialCountDiff[0] += 1;
@@ -198,7 +295,34 @@ public class FleaEncoder extends Encoder {
squareSumDiff[realBitLength - minBeta] += ((double) real * real);
}
- long imag = i < frequencyImag.length ? frequencyImag[i] :
-frequencyImag[n - i];
+ long imag = frequencyImag[i];
+ int imagBitLength =
DescendingBitPackingEncoder.getValueWidth(Math.abs(imag));
+ if (imagBitLength - 1 >= minBeta) {
+ partialCountDiff[0] += 1;
+ if (imagBitLength <= maxBeta) {
+ partialCountDiff[imagBitLength - minBeta] -= 1;
+ }
+ }
+ if (imagBitLength >= minBeta && imagBitLength <= maxBeta) {
+ squareSumDiff[imagBitLength - minBeta] += ((double) imag * imag);
+ }
+ }
+
+ for (int i = freqLen; i < n; i++) {
+ int mirrorIdx = n - i;
+ long real = frequencyReal[mirrorIdx];
+ int realBitLength =
DescendingBitPackingEncoder.getValueWidth(Math.abs(real));
+ if (realBitLength - 1 >= minBeta) {
+ partialCountDiff[0] += 1;
+ if (realBitLength <= maxBeta) {
+ partialCountDiff[realBitLength - minBeta] -= 1;
+ }
+ }
+ if (realBitLength >= minBeta && realBitLength <= maxBeta) {
+ squareSumDiff[realBitLength - minBeta] += ((double) real * real);
+ }
+
+ long imag = -frequencyImag[mirrorIdx];
int imagBitLength =
DescendingBitPackingEncoder.getValueWidth(Math.abs(imag));
if (imagBitLength - 1 >= minBeta) {
partialCountDiff[0] += 1;
@@ -219,17 +343,14 @@ public class FleaEncoder extends Encoder {
long[] partialCount = new long[maxBeta - minBeta + 1];
partialCount[0] = partialCountDiff[0];
for (int beta = minBeta + 1; beta <= maxBeta; beta++) {
- partialCount[beta - minBeta] =
- partialCount[beta - minBeta - 1] + partialCountDiff[beta - minBeta];
+ partialCount[beta - minBeta] = partialCount[beta - minBeta - 1] +
partialCountDiff[beta - minBeta];
}
long[] result = new long[maxBeta - minBeta + 1];
for (int beta = minBeta; beta <= maxBeta; beta++) {
- double squareSumBeta =
- squareSum[beta - minBeta]
- + partialCount[beta - minBeta] * (1L << beta) * (1L << beta) / 3;
- long optimalBitWidth =
- Math.round(Math.ceil(Math.log(Math.sqrt(squareSumBeta) / n + 1) /
Math.log(2)));
+ double squareSumBeta = squareSum[beta - minBeta]
+ + partialCount[beta - minBeta] * (1L << beta) * (1L << beta) / 3;
+ long optimalBitWidth =
Math.round(Math.ceil(Math.log(Math.sqrt(squareSumBeta) / n + 1) / Math.log(2)));
result[beta - minBeta] = (optimalBitWidth + 2) * n;
}
@@ -237,12 +358,31 @@ public class FleaEncoder extends Encoder {
}
private int getOptimalBeta(int n, long[] frequencyReal, long[]
frequencyImag) {
- long[] frequency = estimateFrequency(frequencyReal);
- long[] frequency2 = estimateFrequency(frequencyImag);
+ long[] freqEstReal = estimateFrequency(frequencyReal);
+ long[] freqEstImag = estimateFrequency(frequencyImag);
+ long[] frequency = new long[maxBeta - minBeta + 1];
for (int beta = minBeta; beta <= maxBeta; beta++) {
- frequency[beta - minBeta] += frequency2[beta - minBeta];
+ frequency[beta - minBeta] = freqEstReal[beta - minBeta] +
freqEstImag[beta - minBeta];
}
long[] residual = estimateResidual(n, frequencyReal, frequencyImag);
+
+ if (DEBUG) {
+ for (int beta = minBeta; beta <= maxBeta; beta++) {
+ long total = frequency[beta - minBeta] + residual[beta - minBeta];
+ System.out.println(
+ "[FLEA BETA] beta="
+ + beta
+ + ": freq_real="
+ + freqEstReal[beta - minBeta]
+ + ", freq_imag="
+ + freqEstImag[beta - minBeta]
+ + ", residual="
+ + residual[beta - minBeta]
+ + ", total="
+ + total);
+ }
+ }
+
for (int beta = minBeta; beta <= maxBeta; beta++) {
frequency[beta - minBeta] += residual[beta - minBeta];
}
@@ -260,37 +400,66 @@ public class FleaEncoder extends Encoder {
buffer.add(value);
}
+ private static final ThreadLocal<DoubleFFT_1D> cachedFFT = new
ThreadLocal<>();
+ private static final ThreadLocal<Integer> cachedFFTSize =
ThreadLocal.withInitial(() -> 0);
+
+ private static final ThreadLocal<double[]> workArrayFFT = new
ThreadLocal<>();
+ private static final ThreadLocal<Integer> workArrayFFTSize =
ThreadLocal.withInitial(() -> 0);
+
+ private static DoubleFFT_1D getFFT(int n) {
+ if (cachedFFT.get() == null || cachedFFTSize.get() != n) {
+ cachedFFT.set(new DoubleFFT_1D(n));
+ cachedFFTSize.set(n);
+ }
+ return cachedFFT.get();
+ }
+
+ private static double[] getWorkArray(int size) {
+ if (workArrayFFT.get() == null || workArrayFFTSize.get() < size) {
+ workArrayFFT.set(new double[size]);
+ workArrayFFTSize.set(size);
+ }
+ return workArrayFFT.get();
+ }
+
public static double[][] realFFT(double[] data) {
- double[] dataComplex = new double[data.length * 2];
- for (int i = 0; i < data.length; i++) {
- dataComplex[2 * i] = data[i];
- dataComplex[2 * i + 1] = 0;
- }
- DoubleFFT_1D fft = new DoubleFFT_1D(data.length);
- fft.complexForward(dataComplex);
- double[][] result = new double[2][data.length / 2 + 1];
- for (int i = 0; i <= data.length / 2; i++) {
- result[0][i] = dataComplex[2 * i];
- result[1][i] = dataComplex[2 * i + 1];
- }
- return result; // result[0] is real part, result[1] is imaginary part
+ int n = data.length;
+ double[] workArray = getWorkArray(n * 2);
+
+ for (int i = 0; i < n; i++) {
+ workArray[2 * i] = data[i];
+ workArray[2 * i + 1] = 0;
+ }
+
+ DoubleFFT_1D fft = getFFT(n);
+ fft.complexForward(workArray);
+
+ double[][] result = new double[2][n / 2 + 1];
+ for (int i = 0; i <= n / 2; i++) {
+ result[0][i] = workArray[2 * i];
+ result[1][i] = workArray[2 * i + 1];
+ }
+ return result;
}
public static double[] inverseRealFFT(double[][] data, int n) {
- double[] dataComplex = new double[n * 2];
+ double[] workArray = getWorkArray(n * 2);
+
for (int i = 0; i <= n / 2; i++) {
- dataComplex[2 * i] = data[0][i];
- dataComplex[2 * i + 1] = data[1][i];
+ workArray[2 * i] = data[0][i];
+ workArray[2 * i + 1] = data[1][i];
}
for (int i = n / 2 + 1; i < n; i++) {
- dataComplex[2 * i] = data[0][n - i];
- dataComplex[2 * i + 1] = -data[1][n - i];
+ workArray[2 * i] = data[0][n - i];
+ workArray[2 * i + 1] = -data[1][n - i];
}
- DoubleFFT_1D fft = new DoubleFFT_1D(n);
- fft.complexInverse(dataComplex, true);
+
+ DoubleFFT_1D fft = getFFT(n);
+ fft.complexInverse(workArray, true);
+
double[] result = new double[n];
for (int i = 0; i < n; i++) {
- result[i] = dataComplex[2 * i];
+ result[i] = workArray[2 * i];
}
return result;
}
@@ -303,61 +472,146 @@ public class FleaEncoder extends Encoder {
return (double) value * (1 << beta);
}
+ private static boolean DEBUG = Boolean.getBoolean("flea.debug");
+
+ private LaminarEncoder cachedLaminarEncoder = null;
+ private SeparateStorageEncoder cachedSeparateStorageEncoder = null;
+
+ private LaminarEncoder getLaminarEncoder() {
+ if (cachedLaminarEncoder == null) {
+ cachedLaminarEncoder = new LaminarEncoder();
+ }
+ return cachedLaminarEncoder;
+ }
+
+ private SeparateStorageEncoder getSeparateStorageEncoder() {
+ if (cachedSeparateStorageEncoder == null) {
+ cachedSeparateStorageEncoder = new SeparateStorageEncoder();
+ }
+ return cachedSeparateStorageEncoder;
+ }
+
@Override
public void flush(ByteArrayOutputStream out) throws IOException {
int n = this.buffer.size();
ReadWriteIOUtils.write(n, out);
if (n > 0) {
- double[] data = new double[n];
+ int paddedN = getFFTLength(n);
+ int freqLen = paddedN / 2 + 1;
+
+ int paddingAmount = paddedN - n;
+ if (paddingAmount > 255) {
+ paddedN = n;
+ paddingAmount = 0;
+ freqLen = n / 2 + 1;
+ }
+ out.write(paddingAmount);
+
+ double[] data = new double[paddedN];
+ long lastValue = buffer.get(n - 1);
for (int i = 0; i < n; i++) {
data[i] = buffer.get(i);
}
+ for (int i = n; i < paddedN; i++) {
+ data[i] = lastValue;
+ }
double[][] fftResult = realFFT(data);
double[] frequencyReal = fftResult[0], frequencyImag = fftResult[1];
- long[] frequencyRealLong = new long[frequencyReal.length];
- long[] frequencyImagLong = new long[frequencyImag.length];
- for (int i = 0; i < frequencyReal.length; i++) {
- frequencyRealLong[i] = Math.round(frequencyReal[i]);
+ long[] quantizedReal = new long[freqLen];
+ long[] quantizedImag = new long[freqLen];
+
+ for (int i = 0; i < freqLen; i++) {
+ quantizedReal[i] = Math.round(frequencyReal[i]);
+ quantizedImag[i] = Math.round(frequencyImag[i]);
}
- for (int i = 0; i < frequencyImag.length; i++) {
- frequencyImagLong[i] = Math.round(frequencyImag[i]);
+ int beta = getOptimalBeta(paddedN, quantizedReal, quantizedImag);
+ if (DEBUG) {
+ System.err.println(
+ "[FLEA DEBUG] n="
+ + n
+ + ", paddedN="
+ + paddedN
+ + ", freq_len="
+ + freqLen
+ + ", beta="
+ + beta);
}
- int beta = getOptimalBeta(n, frequencyRealLong, frequencyImagLong);
ReadWriteIOUtils.write(beta, out);
- long[] quantizedReal = new long[frequencyReal.length],
- quantizedImag = new long[frequencyImag.length];
- for (int i = 0; i < frequencyReal.length; i++) {
+ for (int i = 0; i < freqLen; i++) {
quantizedReal[i] = quantize(frequencyReal[i], beta);
quantizedImag[i] = quantize(frequencyImag[i], beta);
}
- LaminarEncoder laminarEncoder = new LaminarEncoder();
+
+ if (DEBUG) {
+ long realNonZero = java.util.Arrays.stream(quantizedReal).filter(x ->
x != 0).count();
+ long imagNonZero = java.util.Arrays.stream(quantizedImag).filter(x ->
x != 0).count();
+ System.err.println("[FLEA DEBUG] quantized_real non-zero: " +
realNonZero + "/" + freqLen);
+ System.err.println("[FLEA DEBUG] quantized_imag non-zero: " +
imagNonZero + "/" + freqLen);
+ }
+
+ int sizeBeforeLaminarReal = out.size();
+ LaminarEncoder laminarEncoder = getLaminarEncoder();
for (long v : quantizedReal) {
laminarEncoder.encode(v, out);
}
laminarEncoder.flush(out);
- laminarEncoder = new LaminarEncoder();
+ int sizeAfterLaminarReal = out.size();
+
for (long v : quantizedImag) {
laminarEncoder.encode(v, out);
}
laminarEncoder.flush(out);
+ int sizeAfterLaminarImag = out.size();
+
+ if (DEBUG) {
+ System.err.println(
+ "[FLEA DEBUG] laminar_real bytes: " + (sizeAfterLaminarReal -
sizeBeforeLaminarReal));
+ System.err.println(
+ "[FLEA DEBUG] laminar_imag bytes: " + (sizeAfterLaminarImag -
sizeAfterLaminarReal));
+ }
- double[][] dequantized = new double[2][frequencyReal.length];
- for (int i = 0; i < frequencyReal.length; i++) {
- dequantized[0][i] = dequantize(quantizedReal[i], beta);
- dequantized[1][i] = dequantize(quantizedImag[i], beta);
+ for (int i = 0; i < freqLen; i++) {
+ fftResult[0][i] = dequantize(quantizedReal[i], beta);
+ fftResult[1][i] = dequantize(quantizedImag[i], beta);
}
- double[] reconstructed = inverseRealFFT(dequantized, n);
- long[] residuals = new long[n];
+ double[] reconstructedFull = inverseRealFFT(fftResult, paddedN);
+
+ long[] residuals = (quantizedReal.length >= n) ? quantizedReal : new
long[n];
for (int i = 0; i < n; i++) {
- residuals[i] = buffer.get(i) - Math.round(reconstructed[i]);
+ residuals[i] = buffer.get(i) - Math.round(reconstructedFull[i]);
}
- SeparateStorageEncoder separateStorageEncoder = new
SeparateStorageEncoder();
- for (long v : residuals) {
- separateStorageEncoder.encode(v, out);
+
+ if (DEBUG) {
+ long residualNonZero = 0;
+ long residualMin = Long.MAX_VALUE, residualMax = Long.MIN_VALUE;
+ for (int i = 0; i < n; i++) {
+ if (residuals[i] != 0)
+ residualNonZero++;
+ if (residuals[i] < residualMin)
+ residualMin = residuals[i];
+ if (residuals[i] > residualMax)
+ residualMax = residuals[i];
+ }
+ System.err.println(
+ "[FLEA DEBUG] residual range: [" + residualMin + ", " +
residualMax + "]");
+ System.err.println("[FLEA DEBUG] residual non-zero: " +
residualNonZero + "/" + n);
+ }
+
+ int sizeBeforeResidual = out.size();
+ SeparateStorageEncoder separateStorageEncoder =
getSeparateStorageEncoder();
+ for (int i = 0; i < n; i++) {
+ separateStorageEncoder.encode(residuals[i], out);
}
separateStorageEncoder.flush(out);
+ int sizeAfterResidual = out.size();
+
+ if (DEBUG) {
+ System.err.println(
+ "[FLEA DEBUG] residual bytes: " + (sizeAfterResidual -
sizeBeforeResidual));
+ System.err.println("[FLEA DEBUG] total bytes: " + sizeAfterResidual);
+ }
}
this.buffer.clear();
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/LaminarEncoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/LaminarEncoder.java
index ec875233..ebff025b 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/LaminarEncoder.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/LaminarEncoder.java
@@ -103,7 +103,8 @@ public class LaminarEncoder extends Encoder {
}
int totalBits = 0;
- for (int width : laminarBitWidths) totalBits += width;
+ for (int width : laminarBitWidths)
+ totalBits += width;
int encodingLength = DescendingBitPackingEncoder.bitsToBytes(totalBits);
this.encodingBlockBuffer = new byte[encodingLength];
int offset = 0;
@@ -147,8 +148,7 @@ public class LaminarEncoder extends Encoder {
flushEncodeArray(sparseValuesArray, out);
int indexBitWidth = DescendingBitPackingEncoder.getValueWidth(n - 1);
- int encodingLength =
- DescendingBitPackingEncoder.bitsToBytes(indexBitWidth *
sparseValuesArray.length);
+ int encodingLength =
DescendingBitPackingEncoder.bitsToBytes(indexBitWidth *
sparseValuesArray.length);
this.encodingBlockBuffer = new byte[encodingLength];
for (int i = 0; i < sparseValuesArray.length; i++) {
BytesUtils.intToBytes(