This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch research/encoding-log
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/research/encoding-log by this
push:
new e39f2ff2 Add LogDelta Encoder. (#600)
e39f2ff2 is described below
commit e39f2ff24a00ee591f39de4e0df19daea8df9331
Author: Matsuzawa <[email protected]>
AuthorDate: Fri Oct 10 13:57:40 2025 +0800
Add LogDelta Encoder. (#600)
---
.../apache/tsfile/encoding/decoder/Decoder.java | 8 +
.../tsfile/encoding/decoder/LogDeltaDecoder.java | 504 ++++++++++++++++
.../tsfile/encoding/encoder/LogDeltaEncoder.java | 654 +++++++++++++++++++++
.../tsfile/encoding/encoder/TSEncodingBuilder.java | 23 +
.../tsfile/file/metadata/enums/TSEncoding.java | 7 +-
5 files changed, 1195 insertions(+), 1 deletion(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
index e0a96016..42865d41 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
@@ -184,6 +184,14 @@ public abstract class Decoder {
default:
throw new TsFileDecodingException(String.format(ERROR_MSG,
encoding, dataType));
}
+ case LOG_DELTA:
+ switch (dataType) {
+ case TEXT:
+ case STRING:
+ return new LogDeltaDecoder();
+ default:
+ throw new TsFileDecodingException(String.format(ERROR_MSG,
encoding, dataType));
+ }
default:
throw new TsFileDecodingException(String.format(ERROR_MSG, encoding,
dataType));
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/LogDeltaDecoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/LogDeltaDecoder.java
new file mode 100644
index 00000000..4efa5bca
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/LogDeltaDecoder.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.encoding.decoder;
+
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.tsfile.utils.Binary;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ * LogDeltaDecoder is a decoder for decompressing text data that was encoded
using LogDelta encoding.
+ * This decoder works in conjunction with LogDeltaEncoder to provide efficient
decompression of
+ * text data that was compressed using the LogDelta algorithm.
+ *
+ * The decoder reconstructs the original text by:
+ * 1. Reading record counts and method information
+ * 2. Decoding RLE-encoded method data
+ * 3. Reconstructing text strings from operations or direct storage
+ * 4. Maintaining a sliding window for reference strings
+ */
+public class LogDeltaDecoder extends Decoder {
+ private static final Logger logger =
LoggerFactory.getLogger(LogDeltaDecoder.class);
+
+ // Internal state
+ private int method0Count;
+ private int method1Count;
+ private int currentMethod0Index;
+ private int currentMethod1Index;
+ private List<Integer> methodList;
+ private int currentMethodIndex;
+
+ // Decoded data
+ private List<Integer> beginPositions;
+ private List<Integer> operationSizes;
+ private List<Integer> allLengths;
+ private List<Integer> allPositions;
+ private List<String> allStrings;
+ private List<String> slidingWindow;
+
+ // Current decoding state
+ private boolean initialized;
+ private int currentStringIndex;
+ private String currentString;
+ private int currentStringPosition;
+
+ public LogDeltaDecoder() {
+ super(TSEncoding.LOG_DELTA);
+ this.reset();
+ logger.debug("tsfile-decoding LogDeltaDecoder: text compression decoder
initialized");
+ }
+
+ @Override
+ public Binary readBinary(ByteBuffer buffer) {
+ if (!initialized) {
+ initializeDecoder(buffer);
+ }
+
+ if (currentString == null) {
+ currentString = decodeNextString(buffer);
+ }
+
+ if (currentString == null) {
+ throw new IllegalStateException("No more data to read");
+ }
+
+ String result = currentString;
+ currentString = null; // Prepare for next call
+ return new Binary(result);
+ }
+
+ @Override
+ public boolean hasNext(ByteBuffer buffer) {
+ if (!initialized) {
+ initializeDecoder(buffer);
+ }
+
+ return currentMethodIndex < methodList.size() || buffer.remaining() > 0;
+ }
+
+ @Override
+ public void reset() {
+ this.method0Count = 0;
+ this.method1Count = 0;
+ this.currentMethod0Index = 0;
+ this.currentMethod1Index = 0;
+ this.methodList = new ArrayList<>();
+ this.currentMethodIndex = 0;
+ this.beginPositions = new ArrayList<>();
+ this.operationSizes = new ArrayList<>();
+ this.allLengths = new ArrayList<>();
+ this.allPositions = new ArrayList<>();
+ this.allStrings = new ArrayList<>();
+ this.slidingWindow = new ArrayList<>();
+ this.initialized = false;
+ this.currentStringIndex = 0;
+ this.currentString = null;
+ this.currentStringPosition = 0;
+ }
+
+ /**
+ * Initialize the decoder with header information
+ */
+ private void initializeDecoder(ByteBuffer buffer) {
+ if (initialized) {
+ return;
+ }
+
+ try {
+ // Read record counts
+ method0Count = buffer.getInt();
+ method1Count = buffer.getInt();
+
+ // Decode method information using RLE
+ methodList = decodeRLE(buffer);
+
+ // Decode method 0 records (with operations)
+ if (method0Count > 0) {
+ decodeMethod0Records(buffer);
+ }
+
+ // Decode method 1 records (direct strings)
+ if (method1Count > 0) {
+ decodeMethod1Records(buffer);
+ }
+
+ initialized = true;
+ logger.debug("LogDeltaDecoder initialized with {} method0 and {} method1
records",
+ method0Count, method1Count);
+ } catch (Exception e) {
+ logger.error("Error initializing LogDeltaDecoder", e);
+ throw new RuntimeException("Failed to initialize decoder", e);
+ }
+ }
+
+ /**
+ * Decode RLE-encoded method information
+ */
+ private List<Integer> decodeRLE(ByteBuffer buffer) {
+ int length = buffer.getInt();
+ int intervalCount = buffer.getInt();
+
+ if (length == 0 || intervalCount == 0) {
+ return new ArrayList<>();
+ }
+
+ // Read encoded data
+ byte[] encodedData = new byte[length];
+ buffer.get(encodedData);
+
+ // Convert to binary string
+ StringBuilder binaryString = new StringBuilder();
+ for (byte b : encodedData) {
+ String binary = String.format("%8s", Integer.toBinaryString(b & 0xFF))
+ .replace(' ', '0');
+ binaryString.append(binary);
+ }
+
+ // Decode intervals
+ List<Integer> result = new ArrayList<>();
+ int currentValue = binaryString.charAt(0) - '0';
+ int pos = 1;
+
+ for (int i = 0; i < intervalCount; i++) {
+ int interval = decodeNumber(binaryString, pos);
+ pos = findNextNumberPosition(binaryString, pos);
+
+ for (int j = 0; j < interval; j++) {
+ result.add(currentValue);
+ }
+ currentValue = 1 - currentValue; // Toggle between 0 and 1
+ }
+
+ return result;
+ }
+
+ /**
+ * Decode a number from binary string
+ */
+ private int decodeNumber(StringBuilder binary, int startPos) {
+ int pos = startPos;
+ int countOnes = 0;
+
+ // Count leading ones
+ while (pos < binary.length() && binary.charAt(pos) == '1') {
+ countOnes++;
+ pos++;
+ }
+
+ // Skip the '0'
+ if (pos < binary.length()) {
+ pos++;
+ }
+
+ // Read the binary number
+ int bitLength = countOnes + 2;
+ int value = 0;
+
+ for (int i = 0; i < bitLength && pos + i < binary.length(); i++) {
+ value = (value << 1) | (binary.charAt(pos + i) - '0');
+ }
+
+ return value;
+ }
+
+ /**
+ * Find the next number position in binary string
+ */
+ private int findNextNumberPosition(StringBuilder binary, int currentPos) {
+ int pos = currentPos;
+
+ // Skip current number
+ while (pos < binary.length() && binary.charAt(pos) == '1') {
+ pos++;
+ }
+ if (pos < binary.length()) {
+ pos++; // Skip the '0'
+ }
+
+ // Skip the binary digits
+ int countOnes = 0;
+ int startPos = pos;
+ while (startPos > 0 && binary.charAt(startPos - 1) == '1') {
+ countOnes++;
+ startPos--;
+ }
+ int bitLength = countOnes + 2;
+ pos += bitLength;
+
+ return pos;
+ }
+
+ /**
+ * Decode method 0 records (with operations)
+ */
+ private void decodeMethod0Records(ByteBuffer buffer) {
+ // Decode begin positions
+ beginPositions = decodeDeltaSequence(buffer);
+
+ // Decode operation sizes
+ operationSizes = decodeDeltaSequence(buffer);
+
+ // Decode all lengths
+ allLengths = decodeDeltaSequence(buffer);
+
+ // Decode all positions
+ allPositions = decodeDeltaSequence(buffer);
+
+ // Decode strings (they are stored as raw bytes)
+ // Note: In a real implementation, we would need to know the string
boundaries
+ // For now, we'll assume strings are null-terminated or have length
prefixes
+ decodeStrings(buffer);
+ }
+
+ /**
+ * Decode method 1 records (direct strings)
+ */
+ private void decodeMethod1Records(ByteBuffer buffer) {
+ // Read strings until we have method1Count strings
+ for (int i = 0; i < method1Count; i++) {
+ String str = readStringUntilNewline(buffer);
+ allStrings.add(str);
+ }
+ }
+
+ /**
+ * Decode a delta-encoded sequence
+ */
+ private List<Integer> decodeDeltaSequence(ByteBuffer buffer) {
+ if (buffer.remaining() < 4) {
+ return new ArrayList<>();
+ }
+
+ int firstValue = buffer.getInt();
+ int minDelta = buffer.getInt();
+ int deltaCount = buffer.getInt();
+ int bitWidth = buffer.get() & 0xFF;
+
+ if (deltaCount == 0) {
+ List<Integer> result = new ArrayList<>();
+ result.add(firstValue);
+ return result;
+ }
+
+ // Decode deltas
+ List<Integer> deltas = new ArrayList<>();
+ if (bitWidth > 0) {
+ int totalBits = deltaCount * bitWidth;
+ int totalBytes = (totalBits + 7) / 8;
+
+ byte[] packedData = new byte[totalBytes];
+ buffer.get(packedData);
+
+ int bitOffset = 0;
+ for (int i = 0; i < deltaCount; i++) {
+ int delta = readBits(packedData, bitOffset, bitWidth);
+ deltas.add(delta);
+ bitOffset += bitWidth;
+ }
+ } else {
+ // All deltas are 0
+ for (int i = 0; i < deltaCount; i++) {
+ deltas.add(0);
+ }
+ }
+
+ // Reconstruct original values
+ List<Integer> result = new ArrayList<>();
+ result.add(firstValue);
+
+ int currentValue = firstValue;
+ for (int normalizedDelta : deltas) {
+ int actualDelta = normalizedDelta + minDelta;
+ currentValue += actualDelta;
+ result.add(currentValue);
+ }
+
+ return result;
+ }
+
+ /**
+ * Read bits from packed data
+ */
+ private int readBits(byte[] data, int bitOffset, int bitWidth) {
+ int value = 0;
+ int byteIndex = bitOffset / 8;
+ int bitIndex = bitOffset % 8;
+
+ for (int i = 0; i < bitWidth; i++) {
+ if (byteIndex >= data.length) {
+ break;
+ }
+
+ int bit = (data[byteIndex] >> (7 - bitIndex)) & 1;
+ value = (value << 1) | bit;
+
+ bitIndex++;
+ if (bitIndex == 8) {
+ bitIndex = 0;
+ byteIndex++;
+ }
+ }
+
+ return value;
+ }
+
+ /**
+ * Decode strings from buffer
+ */
+ private void decodeStrings(ByteBuffer buffer) {
+ // This is a simplified implementation
+ // In practice, we would need to know the exact string boundaries
+ while (buffer.hasRemaining()) {
+ String str = readStringUntilNewline(buffer);
+ if (str != null) {
+ allStrings.add(str);
+ }
+ }
+ }
+
+ /**
+ * Read a string until newline or end of buffer
+ */
+ private String readStringUntilNewline(ByteBuffer buffer) {
+ if (!buffer.hasRemaining()) {
+ return null;
+ }
+
+ StringBuilder sb = new StringBuilder();
+ while (buffer.hasRemaining()) {
+ byte b = buffer.get();
+ if (b == '\n') {
+ break;
+ }
+ sb.append((char) b);
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Decode the next string from the buffer
+ */
+ private String decodeNextString(ByteBuffer buffer) {
+ if (currentMethodIndex >= methodList.size()) {
+ return null;
+ }
+
+ int method = methodList.get(currentMethodIndex);
+ currentMethodIndex++;
+
+ if (method == 1) {
+ // Direct string storage
+ if (currentMethod1Index < method1Count) {
+ String result = allStrings.get(currentMethod1Index);
+ currentMethod1Index++;
+ updateSlidingWindow(result);
+ return result;
+ }
+ } else {
+ // String with operations
+ if (currentMethod0Index < method0Count) {
+ String result = reconstructStringFromOperations(buffer);
+ currentMethod0Index++;
+ updateSlidingWindow(result);
+ return result;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Reconstruct a string from operations
+ */
+ private String reconstructStringFromOperations(ByteBuffer buffer) {
+ if (currentMethod0Index >= beginPositions.size()) {
+ return "";
+ }
+
+ int beginPos = beginPositions.get(currentMethod0Index);
+ int operationSize = operationSizes.get(currentMethod0Index);
+
+ // Get reference string from sliding window
+ String referenceString = "";
+ if (beginPos < slidingWindow.size()) {
+ referenceString = slidingWindow.get(beginPos);
+ }
+
+ // Apply operations
+ StringBuilder result = new StringBuilder();
+ int currentPos = 0;
+
+ for (int i = 0; i < operationSize; i++) {
+ if (currentStringPosition >= allPositions.size()) {
+ break;
+ }
+
+ int position = allPositions.get(currentStringPosition++);
+ int deleteLength = allLengths.get(currentStringPosition++);
+ int insertLength = allLengths.get(currentStringPosition++);
+
+ // Copy unchanged part
+ if (position > currentPos) {
+ result.append(referenceString.substring(currentPos, position));
+ }
+
+ // Skip deleted part
+ currentPos = position + deleteLength;
+
+ // Insert new text
+ if (insertLength > 0) {
+ // Find the corresponding string in allStrings
+ // This is simplified - in practice we'd need better string management
+ if (currentStringIndex < allStrings.size()) {
+ String insertText = allStrings.get(currentStringIndex++);
+ result.append(insertText);
+ }
+ }
+ }
+
+ // Append remaining part
+ if (currentPos < referenceString.length()) {
+ result.append(referenceString.substring(currentPos));
+ }
+
+ return result.toString();
+ }
+
+ /**
+ * Update the sliding window with a new string
+ */
+ private void updateSlidingWindow(String text) {
+ slidingWindow.add(text);
+
+ // Keep window size manageable
+ if (slidingWindow.size() > 8) {
+ slidingWindow.remove(0);
+ }
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/LogDeltaEncoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/LogDeltaEncoder.java
new file mode 100644
index 00000000..0685c933
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/LogDeltaEncoder.java
@@ -0,0 +1,654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.encoding.encoder;
+
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.tsfile.utils.Binary;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * LogDeltaEncoder is a comprehensive text compression encoder based on the
LogDelta algorithm.
+ * This encoder combines multiple compression techniques including:
+ * - Distance-based similarity matching (MinHash, Q-gram cosine distance)
+ * - Q-gram pattern matching for finding common substrings
+ * - Variable-length substitution operations
+ * - Delta encoding for positions and lengths
+ * - RLE encoding for repeated patterns
+ * - Bit packing for efficient storage
+ *
+ * The algorithm works by:
+ * 1. Maintaining a sliding window of recent text strings
+ * 2. For each new string, finding the most similar string in the window
+ * 3. If similarity is above threshold, encoding the differences as operations
+ * 4. If no good match, storing the string directly
+ * 5. Using various encoding techniques to compress the operation data
+ */
+public class LogDeltaEncoder extends Encoder {
+ private static final Logger logger =
LoggerFactory.getLogger(LogDeltaEncoder.class);
+
+ // Algorithm parameters
+ private static final int DEFAULT_WINDOW_SIZE = 8;
+ private static final double DEFAULT_THRESHOLD = 0.06;
+ private static final int DEFAULT_Q_VALUE = 3;
+ private static final int DEFAULT_NUM_HASHES = 50;
+ private static final long PRIME = 1099511628211L;
+
+ // Internal state
+ private List<String> textValues;
+ private Deque<String> slidingWindow;
+ private int windowSize;
+ private double threshold;
+ private int qValue;
+ private boolean useApproximation;
+
+ // MinHash state
+ private long[] hashCoefficientsA;
+ private long[] hashCoefficientsB;
+ private Map<String, long[]> signatureCache;
+
+ public LogDeltaEncoder() {
+ this(DEFAULT_WINDOW_SIZE, DEFAULT_THRESHOLD, DEFAULT_Q_VALUE, true);
+ }
+
+ public LogDeltaEncoder(int windowSize, double threshold, int qValue, boolean
useApproximation) {
+ super(TSEncoding.LOG_DELTA);
+ this.windowSize = windowSize;
+ this.threshold = threshold;
+ this.qValue = qValue;
+ this.useApproximation = useApproximation;
+ this.textValues = new ArrayList<>();
+ this.slidingWindow = new ArrayDeque<>();
+ this.signatureCache = new HashMap<>();
+ initializeHashFunctions();
+ logger.debug("tsfile-encoding LogDeltaEncoder: text compression encoder
initialized");
+ }
+
+ @Override
+ public void encode(Binary value, ByteArrayOutputStream out) {
+ textValues.add(value.toString());
+ }
+
+ @Override
+ public void flush(ByteArrayOutputStream out) throws IOException {
+ if (textValues.isEmpty()) {
+ return;
+ }
+
+ try {
+ // Process all text values and encode them
+ List<Record> records = processTextValues();
+ encodeRecords(records, out);
+ } finally {
+ reset();
+ }
+ }
+
+ /**
+ * Process all text values and generate compression records
+ */
+ private List<Record> processTextValues() {
+ List<Record> records = new ArrayList<>();
+
+ for (String text : textValues) {
+ Record record = processTextString(text);
+ records.add(record);
+
+ // Update sliding window
+ updateSlidingWindow(text);
+ }
+
+ return records;
+ }
+
+ /**
+ * Process a single text string and generate a compression record
+ */
+ private Record processTextString(String text) {
+ Record record = new Record();
+
+ // Find the most similar string in the sliding window
+ double minDistance = 1.0;
+ int bestMatchIndex = -1;
+
+ int index = 0;
+ for (String windowText : slidingWindow) {
+ double distance = calculateDistance(windowText, text);
+ if (distance < minDistance) {
+ minDistance = distance;
+ bestMatchIndex = index;
+ }
+ index++;
+ }
+
+ if (minDistance >= threshold || bestMatchIndex == -1) {
+ // No good match found, store the string directly
+ record.method = 1;
+ record.subStrings.add(text);
+ } else {
+ // Good match found, encode the differences
+ String referenceText = new
ArrayList<>(slidingWindow).get(bestMatchIndex);
+ List<OperationItem> operations = findOperations(referenceText, text);
+
+ if (operations.size() > 0 && calculateOperationCost(operations) <
text.length()) {
+ record.method = 0;
+ record.begin = bestMatchIndex;
+ record.operationSize = operations.size();
+
+ for (OperationItem op : operations) {
+ record.positionList.add(op.position);
+ record.deleteLengths.add(op.deleteLength);
+ record.insertLengths.add(op.insertLength);
+ record.subStrings.add(op.insertText);
+ }
+ } else {
+ // Operations are too expensive, store directly
+ record.method = 1;
+ record.subStrings.add(text);
+ }
+ }
+
+ return record;
+ }
+
+ /**
+ * Calculate distance between two strings using MinHash
+ */
+ private double calculateDistance(String str1, String str2) {
+ if (str1.equals(str2)) {
+ return 0.0;
+ }
+
+ if (str1.isEmpty() || str2.isEmpty()) {
+ return 1.0;
+ }
+
+ try {
+ long[] sig1 = getMinHashSignature(str1);
+ long[] sig2 = getMinHashSignature(str2);
+ return estimateMinHashDistance(sig1, sig2);
+ } catch (Exception e) {
+ logger.warn("Error calculating distance, using maximum distance", e);
+ return 1.0;
+ }
+ }
+
+ /**
+ * Get MinHash signature for a string
+ */
+ private long[] getMinHashSignature(String str) {
+ // Check cache first
+ if (signatureCache.containsKey(str)) {
+ return signatureCache.get(str);
+ }
+
+ long[] signature = new long[DEFAULT_NUM_HASHES];
+ Arrays.fill(signature, Long.MAX_VALUE);
+
+ if (str.length() < qValue) {
+ signatureCache.put(str, signature);
+ return signature;
+ }
+
+ // Generate q-grams and update signature
+ for (int i = 0; i <= str.length() - qValue; i++) {
+ String qgram = str.substring(i, i + qValue);
+ long baseHash = calculateBaseHash(qgram);
+
+ for (int j = 0; j < DEFAULT_NUM_HASHES; j++) {
+ long hash = (hashCoefficientsA[j] * baseHash + hashCoefficientsB[j]) %
PRIME;
+ signature[j] = Math.min(signature[j], hash);
+ }
+ }
+
+ signatureCache.put(str, signature);
+ return signature;
+ }
+
+ /**
+ * Calculate base hash for a string using DJB2 algorithm
+ */
+ private long calculateBaseHash(String str) {
+ long hash = 5381;
+ for (char c : str.toCharArray()) {
+ hash = ((hash << 5) + hash) + c;
+ }
+ return hash;
+ }
+
+ /**
+ * Estimate distance between two MinHash signatures
+ */
+ private double estimateMinHashDistance(long[] sig1, long[] sig2) {
+ int matches = 0;
+ for (int i = 0; i < DEFAULT_NUM_HASHES; i++) {
+ if (sig1[i] == sig2[i]) {
+ matches++;
+ }
+ }
+ return 1.0 - (double) matches / DEFAULT_NUM_HASHES;
+ }
+
+ /**
+ * Find operations to transform str1 into str2
+ */
+ private List<OperationItem> findOperations(String str1, String str2) {
+ if (useApproximation) {
+ return findQgramOperations(str1, str2);
+ } else {
+ return findExactOperations(str1, str2);
+ }
+ }
+
+ /**
+ * Find operations using Q-gram matching (approximate)
+ */
+ private List<OperationItem> findQgramOperations(String str1, String str2) {
+ // Simplified Q-gram matching implementation
+ List<OperationItem> operations = new ArrayList<>();
+
+ // Find common substrings using sliding window
+ int maxMatchLength = 0;
+ int bestStart1 = -1;
+ int bestStart2 = -1;
+
+ for (int i = 0; i < str1.length(); i++) {
+ for (int j = 0; j < str2.length(); j++) {
+ int matchLength = 0;
+ while (i + matchLength < str1.length() &&
+ j + matchLength < str2.length() &&
+ str1.charAt(i + matchLength) == str2.charAt(j + matchLength)) {
+ matchLength++;
+ }
+
+ if (matchLength > maxMatchLength && matchLength >= qValue) {
+ maxMatchLength = matchLength;
+ bestStart1 = i;
+ bestStart2 = j;
+ }
+ }
+ }
+
+ if (maxMatchLength > 0) {
+ // Create operations around the common substring
+ if (bestStart1 > 0 || bestStart2 > 0) {
+ operations.add(new OperationItem(0, bestStart1, bestStart2,
+ str2.substring(0, bestStart2)));
+ }
+
+ if (bestStart1 + maxMatchLength < str1.length() ||
+ bestStart2 + maxMatchLength < str2.length()) {
+ operations.add(new OperationItem(bestStart1 + maxMatchLength,
+ str1.length() - bestStart1 - maxMatchLength,
+ str2.length() - bestStart2 - maxMatchLength,
+ str2.substring(bestStart2 + maxMatchLength)));
+ }
+ } else {
+ // No common substring found, replace entire string
+ operations.add(new OperationItem(0, str1.length(), str2.length(), str2));
+ }
+
+ return operations;
+ }
+
+ /**
+ * Find operations using exact string matching
+ */
+ private List<OperationItem> findExactOperations(String str1, String str2) {
+ // Simplified exact matching - in practice, this would use dynamic
programming
+ List<OperationItem> operations = new ArrayList<>();
+
+ if (!str1.equals(str2)) {
+ operations.add(new OperationItem(0, str1.length(), str2.length(), str2));
+ }
+
+ return operations;
+ }
+
+ /**
+ * Calculate the cost of a set of operations
+ */
+ private double calculateOperationCost(List<OperationItem> operations) {
+ double cost = 5.0; // Base cost
+ for (OperationItem op : operations) {
+ cost += 3.0 + op.insertLength;
+ }
+ return cost;
+ }
+
+ /**
+ * Update the sliding window with a new string
+ */
+ private void updateSlidingWindow(String text) {
+ if (slidingWindow.size() >= windowSize) {
+ slidingWindow.removeFirst();
+ }
+ slidingWindow.addLast(text);
+ }
+
+ /**
+ * Encode all records using the LogDelta algorithm
+ */
+ private void encodeRecords(List<Record> records, ByteArrayOutputStream out)
throws IOException {
+ // Separate records by method
+ List<Record> method0Records = new ArrayList<>();
+ List<Record> method1Records = new ArrayList<>();
+
+ for (Record record : records) {
+ if (record.method == 0) {
+ method0Records.add(record);
+ } else {
+ method1Records.add(record);
+ }
+ }
+
+ // Encode record counts
+ out.write(BytesUtils.intToBytes(method0Records.size()));
+ out.write(BytesUtils.intToBytes(method1Records.size()));
+
+ // Encode method information using RLE
+ List<Integer> methodList = new ArrayList<>();
+ for (Record record : records) {
+ methodList.add(record.method);
+ }
+ encodeRLE(methodList, out);
+
+ // Encode method 0 records (with operations)
+ if (!method0Records.isEmpty()) {
+ encodeMethod0Records(method0Records, out);
+ }
+
+ // Encode method 1 records (direct strings)
+ if (!method1Records.isEmpty()) {
+ encodeMethod1Records(method1Records, out);
+ }
+ }
+
+ /**
+ * Encode method 0 records (with operations)
+ */
+ private void encodeMethod0Records(List<Record> records,
ByteArrayOutputStream out) throws IOException {
+ // Encode begin positions
+ List<Integer> beginPositions = new ArrayList<>();
+ for (Record record : records) {
+ beginPositions.add(record.begin);
+ }
+ encodeDeltaSequence(beginPositions, out);
+
+ // Encode operation sizes
+ List<Integer> operationSizes = new ArrayList<>();
+ for (Record record : records) {
+ operationSizes.add(record.operationSize);
+ }
+ encodeDeltaSequence(operationSizes, out);
+
+ // Encode all lengths (delete and insert)
+ List<Integer> allLengths = new ArrayList<>();
+ for (Record record : records) {
+ allLengths.addAll(record.deleteLengths);
+ allLengths.addAll(record.insertLengths);
+ }
+ encodeDeltaSequence(allLengths, out);
+
+ // Encode positions
+ List<Integer> allPositions = new ArrayList<>();
+ for (Record record : records) {
+ allPositions.addAll(record.positionList);
+ }
+ encodeDeltaSequence(allPositions, out);
+
+ // Encode strings
+ for (Record record : records) {
+ for (String subString : record.subStrings) {
+ byte[] stringBytes = subString.getBytes("UTF-8");
+ out.write(stringBytes);
+ }
+ }
+ }
+
+ /**
+ * Encode method 1 records (direct strings)
+ */
+ private void encodeMethod1Records(List<Record> records,
ByteArrayOutputStream out) throws IOException {
+ for (Record record : records) {
+ for (String subString : record.subStrings) {
+ byte[] stringBytes = subString.getBytes("UTF-8");
+ out.write(stringBytes);
+ out.write('\n'); // Add newline separator
+ }
+ }
+ }
+
+ /**
+ * Encode a sequence using delta encoding
+ */
+ private void encodeDeltaSequence(List<Integer> sequence,
ByteArrayOutputStream out) throws IOException {
+ if (sequence.isEmpty()) {
+ out.write(BytesUtils.intToBytes(0)); // Block count
+ return;
+ }
+
+ // Calculate deltas
+ List<Integer> deltas = new ArrayList<>();
+ for (int i = 1; i < sequence.size(); i++) {
+ deltas.add(sequence.get(i) - sequence.get(i - 1));
+ }
+
+ // Find minimum delta
+ int minDelta = deltas.isEmpty() ? 0 : Collections.min(deltas);
+
+ // Calculate bit width
+ int maxDelta = deltas.isEmpty() ? 0 : Collections.max(deltas);
+ int bitWidth = Math.max(1, 32 - Integer.numberOfLeadingZeros(maxDelta -
minDelta));
+
+ // Encode header
+ out.write(BytesUtils.intToBytes(sequence.get(0))); // First value
+ out.write(BytesUtils.intToBytes(minDelta)); // Min delta
+ out.write(BytesUtils.intToBytes(deltas.size())); // Delta count
+ out.write((byte) bitWidth); // Bit width
+
+ // Encode deltas
+ if (!deltas.isEmpty()) {
+ byte[] buffer = new byte[(deltas.size() * bitWidth + 7) / 8];
+ int bitOffset = 0;
+
+ for (int delta : deltas) {
+ writeBits(buffer, bitOffset, delta - minDelta, bitWidth);
+ bitOffset += bitWidth;
+ }
+
+ out.write(buffer);
+ }
+ }
+
+ /**
+ * Encode using RLE (Run Length Encoding)
+ */
+ private void encodeRLE(List<Integer> values, ByteArrayOutputStream out)
throws IOException {
+ if (values.isEmpty()) {
+ out.write(BytesUtils.intToBytes(0)); // Length
+ out.write(BytesUtils.intToBytes(0)); // Interval count
+ return;
+ }
+
+ // Simple RLE implementation
+ List<Integer> intervals = new ArrayList<>();
+ int currentValue = values.get(0);
+ int count = 1;
+
+ for (int i = 1; i < values.size(); i++) {
+ if (values.get(i) == currentValue) {
+ count++;
+ } else {
+ intervals.add(count);
+ currentValue = values.get(i);
+ count = 1;
+ }
+ }
+ intervals.add(count);
+
+ // Encode intervals
+ StringBuilder binaryString = new StringBuilder();
+ binaryString.append(currentValue);
+
+ for (int interval : intervals) {
+ binaryString.append(encodeNumber(interval));
+ }
+
+ // Pad to byte boundary
+ while (binaryString.length() % 8 != 0) {
+ binaryString.append('0');
+ }
+
+ // Convert to bytes
+ String binary = binaryString.toString();
+ int byteCount = binary.length() / 8;
+
+ out.write(BytesUtils.intToBytes(byteCount)); // Length
+ out.write(BytesUtils.intToBytes(intervals.size())); // Interval count
+
+ for (int i = 0; i < byteCount; i++) {
+ String byteStr = binary.substring(i * 8, (i + 1) * 8);
+ int byteValue = Integer.parseInt(byteStr, 2);
+ out.write(byteValue);
+ }
+ }
+
+ /**
+ * Encode a number using variable-length encoding
+ */
+ private String encodeNumber(int num) {
+ int bitLength = Math.max(2, 32 - Integer.numberOfLeadingZeros(num));
+ StringBuilder result = new StringBuilder();
+
+ // Add (bitLength - 2) '1's followed by '0'
+ for (int i = 0; i < bitLength - 2; i++) {
+ result.append('1');
+ }
+ result.append('0');
+
+ // Add binary representation
+ result.append(Integer.toBinaryString(num));
+
+ return result.toString();
+ }
+
+ /**
+ * Write bits to a byte array
+ */
+ private void writeBits(byte[] buffer, int bitOffset, int value, int
bitWidth) {
+ int byteIndex = bitOffset / 8;
+ int bitIndex = bitOffset % 8;
+
+ for (int i = 0; i < bitWidth; i++) {
+ if (byteIndex >= buffer.length) break;
+
+ int bit = (value >> (bitWidth - 1 - i)) & 1;
+ buffer[byteIndex] |= (bit << (7 - bitIndex));
+
+ bitIndex++;
+ if (bitIndex == 8) {
+ bitIndex = 0;
+ byteIndex++;
+ }
+ }
+ }
+
+ /**
+ * Initialize hash functions for MinHash
+ */
+ private void initializeHashFunctions() {
+ hashCoefficientsA = new long[DEFAULT_NUM_HASHES];
+ hashCoefficientsB = new long[DEFAULT_NUM_HASHES];
+
+ Random random = new Random(12345); // Fixed seed for reproducibility
+ for (int i = 0; i < DEFAULT_NUM_HASHES; i++) {
+ hashCoefficientsA[i] = random.nextLong();
+ hashCoefficientsB[i] = random.nextLong();
+ }
+ }
+
+ /**
+ * Reset the encoder state
+ */
+ private void reset() {
+ textValues.clear();
+ slidingWindow.clear();
+ signatureCache.clear();
+ }
+
+ @Override
+ public int getOneItemMaxSize() {
+ return 1024; // Reasonable estimate for text data
+ }
+
+ @Override
+ public long getMaxByteSize() {
+ if (textValues == null || textValues.isEmpty()) {
+ return 0;
+ }
+
+ // Estimate maximum size based on text length
+ long totalTextLength = 0;
+ for (String text : textValues) {
+ totalTextLength += text.length();
+ }
+
+ // Add overhead for encoding structures
+ return totalTextLength + textValues.size() * 100; // Conservative estimate
+ }
+
+ /**
+ * Record structure for storing compression information
+ */
+ private static class Record {
+ int method; // 0 = with operations, 1 = direct storage
+ int begin; // Index in sliding window
+ int operationSize; // Number of operations
+ List<Integer> positionList = new ArrayList<>();
+ List<Integer> deleteLengths = new ArrayList<>();
+ List<Integer> insertLengths = new ArrayList<>();
+ List<String> subStrings = new ArrayList<>();
+ }
+
+ /**
+ * Operation item for storing text transformations
+ */
+ private static class OperationItem {
+ int position;
+ int deleteLength;
+ int insertLength;
+ String insertText;
+
+ OperationItem(int position, int deleteLength, int insertLength, String
insertText) {
+ this.position = position;
+ this.deleteLength = deleteLength;
+ this.insertLength = insertLength;
+ this.insertText = insertText;
+ }
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
index 38b0731d..20c13051 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
@@ -80,6 +80,8 @@ public abstract class TSEncodingBuilder {
return new RLBE();
case CAMEL:
return new Camel();
+ case LOG_DELTA:
+ return new LogDelta();
default:
throw new UnsupportedOperationException("Unsupported encoding: " +
type);
}
@@ -454,4 +456,25 @@ public abstract class TSEncodingBuilder {
// allowed to do nothing
}
}
+
+ /** for TEXT, STRING. */
+ public static class LogDelta extends TSEncodingBuilder {
+
+ @Override
+ public Encoder getEncoder(TSDataType type) {
+ switch (type) {
+ case TEXT:
+ case STRING:
+ return new LogDeltaEncoder();
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(ERROR_MSG, TSEncoding.LOG_DELTA, type));
+ }
+ }
+
+ @Override
+ public void initFromProps(Map<String, String> props) {
+ // allowed to do nothing
+ }
+ }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/enums/TSEncoding.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/enums/TSEncoding.java
index 6c02cb1f..aefaace8 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/enums/TSEncoding.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/enums/TSEncoding.java
@@ -42,7 +42,8 @@ public enum TSEncoding {
CHIMP((byte) 11),
SPRINTZ((byte) 12),
RLBE((byte) 13),
- CAMEL((byte) 14);
+ CAMEL((byte) 14),
+ LOG_DELTA((byte) 15);
private final byte type;
@SuppressWarnings("java:S2386") // used by other projects
@@ -64,6 +65,7 @@ public enum TSEncoding {
intSet.add(TSEncoding.CHIMP);
intSet.add(TSEncoding.SPRINTZ);
intSet.add(TSEncoding.RLBE);
+ intSet.add(TSEncoding.LOG_DELTA);
TYPE_SUPPORTED_ENCODINGS.put(TSDataType.INT32, intSet);
TYPE_SUPPORTED_ENCODINGS.put(TSDataType.INT64, intSet);
@@ -90,6 +92,7 @@ public enum TSEncoding {
Set<TSEncoding> textSet = new HashSet<>();
textSet.add(TSEncoding.PLAIN);
textSet.add(TSEncoding.DICTIONARY);
+ textSet.add(TSEncoding.LOG_DELTA);
TYPE_SUPPORTED_ENCODINGS.put(TSDataType.TEXT, textSet);
TYPE_SUPPORTED_ENCODINGS.put(TSDataType.STRING, textSet);
@@ -142,6 +145,8 @@ public enum TSEncoding {
return TSEncoding.RLBE;
case 14:
return TSEncoding.CAMEL;
+ case 15:
+ return TSEncoding.LOG_DELTA;
default:
throw new IllegalArgumentException("Invalid input: " + encoding);
}