This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 471aee16c [core] Enable zorder compact configering how many bytes
should be devoted for type VARCHAR/CHAR/BINARY/VARBINARY (#2109)
471aee16c is described below
commit 471aee16c5b64b451538613f34d3538728eef56e
Author: YeJunHao <[email protected]>
AuthorDate: Tue Oct 10 18:17:36 2023 +0800
[core] Enable zorder compact configering how many bytes should be devoted
for type VARCHAR/CHAR/BINARY/VARBINARY (#2109)
---
.../shortcodes/generated/core_configuration.html | 31 +++++++------
.../main/java/org/apache/paimon/CoreOptions.java | 11 +++++
.../org/apache/paimon/sort/zorder/ZIndexer.java | 54 ++++++++++++++++------
.../apache/paimon/sort/zorder/ZIndexerTest.java | 45 ++++++++++++++++++
.../apache/paimon/flink/sorter/ZorderSorter.java | 3 +-
.../SortCompactActionForDynamicBucketITCase.java | 49 +++++++++++++++++++-
6 files changed, 163 insertions(+), 30 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 6ade8c0b5..ee89b81ee 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -473,6 +473,18 @@ This config option does not affect the default filesystem
metastore.</td>
<td>String</td>
<td>The time zone to parse the long watermark value to TIMESTAMP
value. The default value is 'UTC', which means the watermark is defined on
TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ
column, the time zone of watermark is user configured time zone, the value
should be the user configured local time zone. The option value is either a
full name such as 'America/Los_Angeles', or a custom timezone id such as
'GMT-08:00'.</td>
</tr>
+ <tr>
+ <td><h5>snapshot.expire.execution-mode</h5></td>
+ <td style="word-wrap: break-word;">sync</td>
+ <td><p>Enum</p></td>
+ <td>Specifies the execution mode of expire.<br /><br />Possible
values:<ul><li>"sync": Execute expire synchronously. If there are too many
files, it may take a long time and block stream processing.</li><li>"async":
Execute expire asynchronously. If the generation of snapshots is greater than
the deletion, there will be a backlog of files.</li></ul></td>
+ </tr>
+ <tr>
+ <td><h5>snapshot.expire.limit</h5></td>
+ <td style="word-wrap: break-word;">10</td>
+ <td>Integer</td>
+ <td>The maximum number of snapshots allowed to expire at a
time.</td>
+ </tr>
<tr>
<td><h5>snapshot.num-retained.max</h5></td>
<td style="word-wrap: break-word;">2147483647</td>
@@ -491,19 +503,6 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Duration</td>
<td>The maximum time of completed snapshots to retain.</td>
</tr>
- <tr>
- <td><h5>snapshot.expire.execution-mode</h5></td>
- <td style="word-wrap: break-word;">sync</td>
- <td>Enum</td>
- <td>Specifies the execution mode of expire.<br /><br />Possible
values:<ul><li>"sync": Execute expire synchronously. If there are too many
files, it may take a long time and block stream processing.</li><li>"async":
Execute expire asynchronously. If the generation of snapshots is greater than
the deletion, there will be a backlog of files.</li></ul></td>
- </tr>
- </tr>
- <tr>
- <td><h5>snapshot.expire.limit</h5></td>
- <td style="word-wrap: break-word;">10</td>
- <td>Integer</td>
- <td>The maximum number of snapshots allowed to expire at a
time.</td>
- </tr>
<tr>
<td><h5>sort-engine</h5></td>
<td style="word-wrap: break-word;">loser-tree</td>
@@ -612,5 +611,11 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Boolean</td>
<td>If set to true, compactions and snapshot expiration will be
skipped. This option is used along with dedicated compact jobs.</td>
</tr>
+ <tr>
+ <td><h5>zorder.var-length-contribution</h5></td>
+ <td style="word-wrap: break-word;">8</td>
+ <td>Integer</td>
+ <td>The bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote
to the zorder sort.</td>
+ </tr>
</tbody>
</table>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 607772001..1935e9dd7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -897,6 +897,13 @@ public class CoreOptions implements Serializable {
+ "This can reduce job startup time and
excessive initialization of index, "
+ "but please note that this may also
cause data duplication.");
+ public static final ConfigOption<Integer> ZORDER_VAR_LENGTH_CONTRIBUTION =
+ key("zorder.var-length-contribution")
+ .intType()
+ .defaultValue(8)
+ .withDescription(
+ "The bytes of types (CHAR, VARCHAR, BINARY,
VARBINARY) devote to the zorder sort.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -1335,6 +1342,10 @@ public class CoreOptions implements Serializable {
return options.get(CROSS_PARTITION_UPSERT_BOOTSTRAP_MIN_PARTITION);
}
+ public int varTypeSize() {
+ return options.get(ZORDER_VAR_LENGTH_CONTRIBUTION);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
b/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
index 9701dd90e..7f31f8d29 100644
--- a/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
@@ -67,8 +67,13 @@ public class ZIndexer implements Serializable {
private transient ByteBuffer reuse;
public ZIndexer(RowType rowType, List<String> orderColumns) {
+ this(rowType, orderColumns, PRIMITIVE_BUFFER_SIZE);
+ }
+
+ public ZIndexer(RowType rowType, List<String> orderColumns, int
varTypeSize) {
List<String> fields = rowType.getFieldNames();
fieldsIndex = new int[orderColumns.size()];
+ int varTypeCount = 0;
for (int i = 0; i < fieldsIndex.length; i++) {
int index = fields.indexOf(orderColumns.get(i));
if (index == -1) {
@@ -79,9 +84,22 @@ public class ZIndexer implements Serializable {
+ fields);
}
fieldsIndex[i] = index;
+
+ if (isVarType(rowType.getFieldTypes().get(index))) {
+ varTypeCount++;
+ }
}
- this.functionSet = constructFunctionMap(rowType.getFields());
- this.totalBytes = PRIMITIVE_BUFFER_SIZE * this.fieldsIndex.length;
+ this.functionSet = constructFunctionMap(rowType.getFields(),
varTypeSize);
+ this.totalBytes =
+ PRIMITIVE_BUFFER_SIZE * (this.fieldsIndex.length -
varTypeCount)
+ + varTypeSize * varTypeCount;
+ }
+
+ private static boolean isVarType(DataType dataType) {
+ return dataType instanceof CharType
+ || dataType instanceof VarCharType
+ || dataType instanceof BinaryType
+ || dataType instanceof VarBinaryType;
}
public void open() {
@@ -104,28 +122,32 @@ public class ZIndexer implements Serializable {
return ZOrderByteUtils.interleaveBits(columnBytes, totalBytes, reuse);
}
- public Set<RowProcessor> constructFunctionMap(List<DataField> fields) {
+ public Set<RowProcessor> constructFunctionMap(List<DataField> fields, int
varTypeSize) {
Set<RowProcessor> zorderFunctionSet = new LinkedHashSet<>();
// Construct zorderFunctionSet and fill dataTypes, rowFields
for (int index : fieldsIndex) {
DataField field = fields.get(index);
- zorderFunctionSet.add(zmapColumnToCalculator(field, index));
+ zorderFunctionSet.add(zmapColumnToCalculator(field, index,
varTypeSize));
}
return zorderFunctionSet;
}
- public static RowProcessor zmapColumnToCalculator(DataField field, int
index) {
+ public static RowProcessor zmapColumnToCalculator(DataField field, int
index, int varTypeSize) {
DataType type = field.type();
- return new RowProcessor(type.accept(new TypeVisitor(index)));
+ return new RowProcessor(
+ type.accept(new TypeVisitor(index, varTypeSize)),
+ isVarType(type) ? varTypeSize : PRIMITIVE_BUFFER_SIZE);
}
/** Type Visitor to generate function map from row column to z-index. */
public static class TypeVisitor implements
DataTypeVisitor<ZProcessFunction>, Serializable {
private final int fieldIndex;
+ private final int varTypeSize;
- public TypeVisitor(int index) {
+ public TypeVisitor(int index, int varTypeSize) {
this.fieldIndex = index;
+ this.varTypeSize = varTypeSize;
}
@Override
@@ -140,9 +162,9 @@ public class ZIndexer implements Serializable {
binaryString.getSegments(),
binaryString.getOffset(),
Math.min(
- PRIMITIVE_BUFFER_SIZE,
+ varTypeSize,
binaryString.getSizeInBytes())),
- PRIMITIVE_BUFFER_SIZE,
+ varTypeSize,
reuse)
.array();
};
@@ -160,9 +182,9 @@ public class ZIndexer implements Serializable {
binaryString.getSegments(),
binaryString.getOffset(),
Math.min(
- PRIMITIVE_BUFFER_SIZE,
+ varTypeSize,
binaryString.getSizeInBytes())),
- PRIMITIVE_BUFFER_SIZE,
+ varTypeSize,
reuse)
.array();
};
@@ -186,7 +208,7 @@ public class ZIndexer implements Serializable {
row.isNullAt(fieldIndex)
? NULL_BYTES
: ZOrderByteUtils.byteTruncateOrFill(
- row.getBinary(fieldIndex),
PRIMITIVE_BUFFER_SIZE, reuse)
+ row.getBinary(fieldIndex),
varTypeSize, reuse)
.array();
}
@@ -196,7 +218,7 @@ public class ZIndexer implements Serializable {
row.isNullAt(fieldIndex)
? NULL_BYTES
: ZOrderByteUtils.byteTruncateOrFill(
- row.getBinary(fieldIndex),
PRIMITIVE_BUFFER_SIZE, reuse)
+ row.getBinary(fieldIndex),
varTypeSize, reuse)
.array();
}
@@ -342,13 +364,15 @@ public class ZIndexer implements Serializable {
private transient ByteBuffer reuse;
private final ZProcessFunction process;
+ private final int byteSize;
- public RowProcessor(ZProcessFunction process) {
+ public RowProcessor(ZProcessFunction process, int byteSize) {
this.process = process;
+ this.byteSize = byteSize;
}
public void open() {
- reuse = ByteBuffer.allocate(PRIMITIVE_BUFFER_SIZE);
+ reuse = ByteBuffer.allocate(byteSize);
}
public byte[] zvalue(InternalRow o) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
b/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
index 4e9c08993..4beb682e7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
@@ -18,11 +18,13 @@
package org.apache.paimon.sort.zorder;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.ZOrderByteUtils;
import org.assertj.core.api.Assertions;
@@ -67,4 +69,47 @@ public class ZIndexerTest {
}
}
}
+
+ @Test
+ public void testZIndexerForVarchar() {
+ RowType rowType = RowType.of(new VarCharType(), new VarCharType());
+
+ int varTypeSize = 10;
+ ZIndexer zIndexer = new ZIndexer(rowType, Arrays.asList("f0", "f1"),
varTypeSize);
+ zIndexer.open();
+
+ for (int i = 0; i < 1000; i++) {
+ BinaryString a = BinaryString.fromString(randomString(varTypeSize
+ 1));
+ BinaryString b =
BinaryString.fromString(randomString(varTypeSize));
+
+ InternalRow internalRow = GenericRow.of(a, b);
+
+ byte[] zOrder = zIndexer.index(internalRow);
+
+ byte[][] zCache = new byte[2][];
+ ByteBuffer byteBuffer = ByteBuffer.allocate(varTypeSize);
+ ZOrderByteUtils.stringToOrderedBytes(a.toString(), varTypeSize,
byteBuffer);
+ zCache[0] = Arrays.copyOf(byteBuffer.array(), varTypeSize);
+
+ ZOrderByteUtils.stringToOrderedBytes(b.toString(), varTypeSize,
byteBuffer);
+ zCache[1] = Arrays.copyOf(byteBuffer.array(), varTypeSize);
+
+ byte[] expectedZOrder =
+ ZOrderByteUtils.interleaveBits(zCache, zCache.length *
varTypeSize);
+
+ for (int j = 0; j < zCache.length * varTypeSize; j++) {
+ Assertions.assertThat(zOrder[j]).isEqualTo(expectedZOrder[j]);
+ }
+ }
+ }
+
+ public static String randomString(int length) {
+ byte[] buffer = new byte[length];
+
+ for (int i = 0; i < length; i += 1) {
+ buffer[i] = (byte) ('a' + RANDOM.nextInt(26));
+ }
+
+ return new String(buffer);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
index 66051dc03..bc3c7f5c5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
@@ -70,7 +70,8 @@ public class ZorderSorter extends TableSorter {
*/
private DataStream<RowData> sortStreamByZOrder(
DataStream<RowData> inputStream, FileStoreTable table) {
- final ZIndexer zIndexer = new ZIndexer(table.rowType(), orderColNames);
+ final ZIndexer zIndexer =
+ new ZIndexer(table.rowType(), orderColNames,
table.coreOptions().varTypeSize());
return SortUtils.sortStreamByKey(
inputStream,
table,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
index 4d219e0a9..104962f3a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
@@ -18,7 +18,9 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.manifest.ManifestEntry;
@@ -114,6 +116,42 @@ public class SortCompactActionForDynamicBucketITCase
extends ActionITCaseBase {
.isLessThan(filesFilter.size() / (double) files.size());
}
+ @Test
+ public void testDynamicBucketSortWithStringType() throws Exception {
+ createTable();
+
+ commit(writeData(100));
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(getTable().rowType());
+ Predicate predicate =
+ predicateBuilder.between(
+ 4,
+ BinaryString.fromString("000000000" + 100),
+ BinaryString.fromString("000000000" + 200));
+
+ List<ManifestEntry> files = ((FileStoreTable)
getTable()).store().newScan().plan().files();
+ List<ManifestEntry> filesFilter =
+ ((ChangelogWithKeyFileStoreTable) getTable())
+ .store()
+ .newScan()
+ .withValueFilter(predicate)
+ .plan()
+ .files();
+
+ zorder(Arrays.asList("f4"));
+
+ List<ManifestEntry> filesZorder =
+ ((FileStoreTable) getTable()).store().newScan().plan().files();
+ List<ManifestEntry> filesFilterZorder =
+ ((ChangelogWithKeyFileStoreTable) getTable())
+ .store()
+ .newScan()
+ .withValueFilter(predicate)
+ .plan()
+ .files();
+ Assertions.assertThat(filesFilterZorder.size() / (double)
filesZorder.size())
+ .isLessThan(filesFilter.size() / (double) files.size());
+ }
+
private void zorder(List<String> columns) throws Exception {
if (RANDOM.nextBoolean()) {
new SortCompactAction(
@@ -162,10 +200,12 @@ public class SortCompactActionForDynamicBucketITCase
extends ActionITCaseBase {
schemaBuilder.column("f1", DataTypes.BIGINT());
schemaBuilder.column("f2", DataTypes.BIGINT());
schemaBuilder.column("f3", DataTypes.BIGINT());
+ schemaBuilder.column("f4", DataTypes.STRING());
schemaBuilder.option("bucket", "-1");
schemaBuilder.option("scan.parallelism", "6");
schemaBuilder.option("sink.parallelism", "3");
schemaBuilder.option("dynamic-bucket.target-row-num", "100");
+ schemaBuilder.option(CoreOptions.ZORDER_VAR_LENGTH_CONTRIBUTION.key(),
"14");
schemaBuilder.primaryKey("f0");
return schemaBuilder.build();
}
@@ -204,12 +244,19 @@ public class SortCompactActionForDynamicBucketITCase
extends ActionITCaseBase {
}
private static InternalRow data(int bucket) {
+ String in = String.valueOf(Math.abs(RANDOM.nextInt(10000)));
+ int count = 4 - in.length();
+ for (int i = 0; i < count; i++) {
+ in = "0" + in;
+ }
+ assert in.length() == 4;
GenericRow row =
GenericRow.of(
RANDOM.nextLong(),
(long) RANDOM.nextInt(10000),
(long) RANDOM.nextInt(10000),
- (long) RANDOM.nextInt(10000));
+ (long) RANDOM.nextInt(10000),
+ BinaryString.fromString("00000000" + in));
return new DynamicBucketRow(row, bucket);
}
}