This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a46d561 Fix byte calculation for maxBytesInMemory to take into
account of Sink/Hydrant Object overhead (#10740)
a46d561 is described below
commit a46d561bd7e2b045a08a2e475847d4a7505a1c93
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Wed Jan 27 00:34:56 2021 -0800
Fix byte calculation for maxBytesInMemory to take into account of
Sink/Hydrant Object overhead (#10740)
* Fix byte calculation for maxBytesInMemory to take into account of
Sink/Hydrant Object overhead
* Fix byte calculation for maxBytesInMemory to take into account of
Sink/Hydrant Object overhead
* Fix byte calculation for maxBytesInMemory to take into account of
Sink/Hydrant Object overhead
* Fix byte calculation for maxBytesInMemory to take into account of
Sink/Hydrant Object overhead
* fix checkstyle
* Fix byte calculation for maxBytesInMemory to take into account of
Sink/Hydrant Object overhead
* Fix byte calculation for maxBytesInMemory to take into account of
Sink/Hydrant Object overhead
* fix test
* fix test
* add log
* Fix byte calculation for maxBytesInMemory to take into account of
Sink/Hydrant Object overhead
* address comments
* fix checkstyle
* fix checkstyle
* add config to skip overhead memory calculation
* add test for the skipBytesInMemoryOverheadCheck config
* add docs
* fix checkstyle
* fix checkstyle
* fix spelling
* address comments
* fix travis
* address comments
---
docs/ingestion/hadoop.md | 2 +-
docs/ingestion/index.md | 3 +-
docs/ingestion/native-batch.md | 4 +-
.../indexing/kafka/KafkaIndexTaskTuningConfig.java | 4 +
.../supervisor/KafkaSupervisorTuningConfig.java | 5 +
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 1 +
.../kafka/KafkaIndexTaskTuningConfigTest.java | 3 +
.../kafka/supervisor/KafkaSupervisorTest.java | 5 +
.../TestModifiedKafkaIndexTaskTuningConfig.java | 2 +
.../kinesis/KinesisIndexTaskTuningConfig.java | 4 +
.../supervisor/KinesisSupervisorTuningConfig.java | 5 +
.../kinesis/KinesisIndexTaskSerdeTest.java | 1 +
.../indexing/kinesis/KinesisIndexTaskTest.java | 1 +
.../kinesis/KinesisIndexTaskTuningConfigTest.java | 3 +
.../kinesis/supervisor/KinesisSupervisorTest.java | 3 +
.../TestModifiedKinesisIndexTaskTuningConfig.java | 3 +
.../index/RealtimeAppenderatorTuningConfig.java | 12 +
.../druid/indexing/common/task/CompactionTask.java | 1 +
.../druid/indexing/common/task/IndexTask.java | 20 +-
.../parallel/ParallelIndexSupervisorTask.java | 1 +
.../batch/parallel/ParallelIndexTuningConfig.java | 4 +
.../SeekableStreamIndexTaskTuningConfig.java | 13 +
.../AppenderatorDriverRealtimeIndexTaskTest.java | 1 +
.../task/ClientCompactionTaskQuerySerdeTest.java | 1 +
.../common/task/CompactionTaskRunTest.java | 1 +
.../indexing/common/task/CompactionTaskTest.java | 6 +
.../indexing/common/task/IndexTaskSerdeTest.java | 6 +
.../druid/indexing/common/task/IndexTaskTest.java | 4 +
.../common/task/RealtimeIndexTaskTest.java | 1 +
.../druid/indexing/common/task/TaskSerdeTest.java | 3 +
.../AbstractParallelIndexSupervisorTaskTest.java | 2 +
.../ParallelIndexSupervisorTaskKillTest.java | 1 +
.../ParallelIndexSupervisorTaskResourceTest.java | 1 +
.../ParallelIndexSupervisorTaskSerdeTest.java | 1 +
.../parallel/ParallelIndexSupervisorTaskTest.java | 1 +
.../parallel/ParallelIndexTestingFactory.java | 1 +
.../parallel/ParallelIndexTuningConfigTest.java | 7 +
.../parallel/SinglePhaseParallelIndexingTest.java | 1 +
.../druid/indexing/overlord/TaskLifecycleTest.java | 5 +
.../SeekableStreamSupervisorStateTest.java | 1 +
.../docker/environment-configs/indexer | 2 +-
.../segment/indexing/RealtimeTuningConfig.java | 14 +
.../druid/segment/indexing/TuningConfig.java | 1 +
.../apache/druid/segment/realtime/FireHydrant.java | 22 ++
.../realtime/appenderator/AppenderatorConfig.java | 2 +
.../realtime/appenderator/AppenderatorImpl.java | 80 +++-
.../UnifiedIndexerAppenderatorsManager.java | 6 +
.../appenderator/AppenderatorPlumberTest.java | 1 +
.../realtime/appenderator/AppenderatorTest.java | 402 ++++++++++++++++++++-
.../realtime/appenderator/AppenderatorTester.java | 15 +-
.../DefaultOfflineAppenderatorFactoryTest.java | 1 +
.../plumber/RealtimePlumberSchoolTest.java | 1 +
.../druid/segment/realtime/plumber/SinkTest.java | 2 +
.../druid/cli/validate/DruidJsonValidatorTest.java | 1 +
website/.spelling | 1 +
55 files changed, 681 insertions(+), 14 deletions(-)
diff --git a/docs/ingestion/hadoop.md b/docs/ingestion/hadoop.md
index 2e81bc0..37f491c 100644
--- a/docs/ingestion/hadoop.md
+++ b/docs/ingestion/hadoop.md
@@ -320,7 +320,7 @@ The tuningConfig is optional and default parameters will be
used if no tuningCon
|version|String|The version of created segments. Ignored for HadoopIndexTask
unless useExplicitVersion is set to true|no (default == datetime that indexing
starts at)|
|partitionsSpec|Object|A specification of how to partition each time bucket
into segments. Absence of this property means no partitioning will occur. See
[`partitionsSpec`](#partitionsspec) below.|no (default == 'hashed')|
|maxRowsInMemory|Integer|The number of rows to aggregate before persisting.
Note that this is the number of post-aggregation rows which may not be equal to
the number of input events due to roll-up. This is used to manage the required
JVM heap size. Normally user does not need to set this, but depending on the
nature of data, if rows are short in terms of bytes, user may not want to store
a million rows in memory and this value should be set.|no (default == 1000000)|
-|maxBytesInMemory|Long|The number of bytes to aggregate in heap memory before
persisting. Normally this is computed internally and user does not need to set
it. This is based on a rough estimate of memory usage and not actual usage. The
maximum heap memory usage for indexing is maxBytesInMemory * (2 +
maxPendingPersists).|no (default == One-sixth of max JVM memory)|
+|maxBytesInMemory|Long|The number of bytes to aggregate in heap memory before
persisting. Normally this is computed internally and user does not need to set
it. This is based on a rough estimate of memory usage and not actual usage. The
maximum heap memory usage for indexing is maxBytesInMemory * (2 +
maxPendingPersists). Note that `maxBytesInMemory` also includes heap usage of
artifacts created from intermediary persists. This means that after every
persist, the amount of `maxBytesInMem [...]
|leaveIntermediate|Boolean|Leave behind intermediate files (for debugging) in
the workingPath when a job completes, whether it passes or fails.|no (default
== false)|
|cleanupOnFailure|Boolean|Clean up intermediate files when a job fails (unless
leaveIntermediate is on).|no (default == true)|
|overwriteFiles|Boolean|Override existing files found during indexing.|no
(default == false)|
diff --git a/docs/ingestion/index.md b/docs/ingestion/index.md
index 61ad679..75ea703 100644
--- a/docs/ingestion/index.md
+++ b/docs/ingestion/index.md
@@ -720,7 +720,8 @@ is:
|-----|-----------|-------|
|type|Each ingestion method has its own tuning type code. You must specify the
type code that matches your ingestion method. Common options are `index`,
`hadoop`, `kafka`, and `kinesis`.||
|maxRowsInMemory|The maximum number of records to store in memory before
persisting to disk. Note that this is the number of rows post-rollup, and so it
may not be equal to the number of input records. Ingested records will be
persisted to disk when either `maxRowsInMemory` or `maxBytesInMemory` are
reached (whichever happens first).|`1000000`|
-|maxBytesInMemory|The maximum aggregate size of records, in bytes, to store in
the JVM heap before persisting. This is based on a rough estimate of memory
usage. Ingested records will be persisted to disk when either `maxRowsInMemory`
or `maxBytesInMemory` are reached (whichever happens first).<br /><br />Setting
maxBytesInMemory to -1 disables this check, meaning Druid will rely entirely on
maxRowsInMemory to control memory usage. Setting it to zero means the default
value will be used [...]
+|maxBytesInMemory|The maximum aggregate size of records, in bytes, to store in
the JVM heap before persisting. This is based on a rough estimate of memory
usage. Ingested records will be persisted to disk when either `maxRowsInMemory`
or `maxBytesInMemory` are reached (whichever happens first). `maxBytesInMemory`
also includes heap usage of artifacts created from intermediary persists. This
means that after every persist, the amount of `maxBytesInMemory` until next
persist will decreases [...]
+|skipBytesInMemoryOverheadCheck|The calculation of maxBytesInMemory takes into
account overhead objects created during ingestion and each intermediate
persist. Setting this to true can exclude the bytes of these overhead objects
from maxBytesInMemory check.|false|
|indexSpec|Tune how data is indexed. See below for more information.|See table
below|
|Other properties|Each ingestion method has its own list of additional tuning
properties. See the documentation for each method for a full list: [Kafka
indexing
service](../development/extensions-core/kafka-ingestion.md#tuningconfig),
[Kinesis indexing
service](../development/extensions-core/kinesis-ingestion.md#tuningconfig),
[Native batch](native-batch.md#tuningconfig), and
[Hadoop-based](hadoop.md#tuningconfig).||
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index 73515dd..6e5620d 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -203,7 +203,7 @@ The tuningConfig is optional and default parameters will be
used if no tuningCon
|type|The task type, this should always be `index_parallel`.|none|yes|
|maxRowsPerSegment|Deprecated. Use `partitionsSpec` instead. Used in sharding.
Determines how many rows are in each segment.|5000000|no|
|maxRowsInMemory|Used in determining when intermediate persists to disk should
occur. Normally user does not need to set this, but depending on the nature of
data, if rows are short in terms of bytes, user may not want to store a million
rows in memory and this value should be set.|1000000|no|
-|maxBytesInMemory|Used in determining when intermediate persists to disk
should occur. Normally this is computed internally and user does not need to
set it. This value represents number of bytes to aggregate in heap memory
before persisting. This is based on a rough estimate of memory usage and not
actual usage. The maximum heap memory usage for indexing is maxBytesInMemory *
(2 + maxPendingPersists)|1/6 of max JVM memory|no|
+|maxBytesInMemory|Used in determining when intermediate persists to disk
should occur. Normally this is computed internally and user does not need to
set it. This value represents number of bytes to aggregate in heap memory
before persisting. This is based on a rough estimate of memory usage and not
actual usage. The maximum heap memory usage for indexing is maxBytesInMemory *
(2 + maxPendingPersists). Note that `maxBytesInMemory` also includes heap usage
of artifacts created from interm [...]
|maxColumnsToMerge|A parameter that limits how many segments can be merged in
a single phase when merging segments for publishing. This limit is imposed on
the total number of columns present in a set of segments being merged. If the
limit is exceeded, segment merging will occur in multiple phases. At least 2
segments will be merged in a single phase, regardless of this setting.|-1
(unlimited)|no|
|maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows
in segments waiting for being pushed. Used in determining when intermediate
pushing should occur.|20000000|no|
|numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the
number of shards to create when using a `hashed` `partitionsSpec`. If this is
specified and `intervals` is specified in the `granularitySpec`, the index task
can skip the determine intervals/partitions pass through the data. `numShards`
cannot be specified if `maxRowsPerSegment` is set.|null|no|
@@ -729,7 +729,7 @@ The tuningConfig is optional and default parameters will be
used if no tuningCon
|type|The task type, this should always be "index".|none|yes|
|maxRowsPerSegment|Deprecated. Use `partitionsSpec` instead. Used in sharding.
Determines how many rows are in each segment.|5000000|no|
|maxRowsInMemory|Used in determining when intermediate persists to disk should
occur. Normally user does not need to set this, but depending on the nature of
data, if rows are short in terms of bytes, user may not want to store a million
rows in memory and this value should be set.|1000000|no|
-|maxBytesInMemory|Used in determining when intermediate persists to disk
should occur. Normally this is computed internally and user does not need to
set it. This value represents number of bytes to aggregate in heap memory
before persisting. This is based on a rough estimate of memory usage and not
actual usage. The maximum heap memory usage for indexing is maxBytesInMemory *
(2 + maxPendingPersists)|1/6 of max JVM memory|no|
+|maxBytesInMemory|Used in determining when intermediate persists to disk
should occur. Normally this is computed internally and user does not need to
set it. This value represents number of bytes to aggregate in heap memory
before persisting. This is based on a rough estimate of memory usage and not
actual usage. The maximum heap memory usage for indexing is maxBytesInMemory *
(2 + maxPendingPersists). Note that `maxBytesInMemory` also includes heap usage
of artifacts created from interm [...]
|maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows
in segments waiting for being pushed. Used in determining when intermediate
pushing should occur.|20000000|no|
|numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the
number of shards to create. If this is specified and `intervals` is specified
in the `granularitySpec`, the index task can skip the determine
intervals/partitions pass through the data. `numShards` cannot be specified if
`maxRowsPerSegment` is set.|null|no|
|partitionDimensions|Deprecated. Use `partitionsSpec` instead. The dimensions
to partition on. Leave blank to select all dimensions. Only used with
`forceGuaranteedRollup` = true, will be ignored otherwise.|null|no|
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
index 8dc9a2f..2a7b077 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
@@ -37,6 +37,7 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
+ @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean
skipBytesInMemoryOverheadCheck,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") @Nullable Period
intermediatePersistPeriod,
@@ -60,6 +61,7 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
@@ -87,6 +89,7 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
+ isSkipBytesInMemoryOverheadCheck(),
getMaxRowsPerSegment(),
getMaxTotalRows(),
getIntermediatePersistPeriod(),
@@ -115,6 +118,7 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", maxTotalRows=" + getMaxTotalRows() +
", maxBytesInMemory=" + getMaxBytesInMemory() +
+ ", skipBytesInMemoryOverheadCheck=" +
isSkipBytesInMemoryOverheadCheck() +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
index a9e0bfe..04cc74f 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
@@ -68,6 +68,7 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
null,
null,
null,
+ null,
null
);
}
@@ -76,6 +77,7 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
+ @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean
skipBytesInMemoryOverheadCheck,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") Period
intermediatePersistPeriod,
@@ -105,6 +107,7 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
@@ -197,6 +200,7 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", maxTotalRows=" + getMaxTotalRows() +
", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() +
+ ", skipBytesInMemoryOverheadCheck=" +
isSkipBytesInMemoryOverheadCheck() +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +
@@ -225,6 +229,7 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
+ isSkipBytesInMemoryOverheadCheck(),
getMaxRowsPerSegment(),
getMaxTotalRows(),
getIntermediatePersistPeriod(),
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 3ca1c0b..18faf2e 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2674,6 +2674,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
null,
1000,
null,
+ null,
maxRowsPerSegment,
maxTotalRows,
new Period("P1Y"),
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index 8888a69..e27d7d0 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -123,6 +123,7 @@ public class KafkaIndexTaskTuningConfigTest
null,
1,
null,
+ null,
2,
10L,
new Period("PT3S"),
@@ -168,6 +169,7 @@ public class KafkaIndexTaskTuningConfigTest
null,
1,
null,
+ null,
2,
10L,
new Period("PT3S"),
@@ -218,6 +220,7 @@ public class KafkaIndexTaskTuningConfigTest
null,
1,
null,
+ null,
2,
10L,
new Period("PT3S"),
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 1bded68..1dbabe8 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -272,6 +272,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
+ null,
null
),
null
@@ -3075,6 +3076,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
1000,
null,
+ null,
50000,
null,
new Period("P1Y"),
@@ -3115,6 +3117,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
42, // This is different
null,
+ null,
50000,
null,
new Period("P1Y"),
@@ -3411,6 +3414,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
1000,
null,
+ null,
50000,
null,
new Period("P1Y"),
@@ -3522,6 +3526,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
1000,
null,
+ null,
50000,
null,
new Period("P1Y"),
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
index 06550e1..2c1a684 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
@@ -41,6 +41,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends
KafkaIndexTaskTuning
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
+ @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean
skipBytesInMemoryOverheadCheck,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") @Nullable Period
intermediatePersistPeriod,
@@ -65,6 +66,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends
KafkaIndexTaskTuning
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
index 428f54f..2bbfffe 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
@@ -54,6 +54,7 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
+ @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean
skipBytesInMemoryOverheadCheck,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") Period
intermediatePersistPeriod,
@@ -83,6 +84,7 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
@@ -160,6 +162,7 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
+ isSkipBytesInMemoryOverheadCheck(),
getMaxRowsPerSegment(),
getMaxTotalRows(),
getIntermediatePersistPeriod(),
@@ -227,6 +230,7 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
return "KinesisIndexTaskTuningConfig{" +
"maxRowsInMemory=" + getMaxRowsInMemory() +
", maxBytesInMemory=" + getMaxBytesInMemory() +
+ ", skipBytesInMemoryOverheadCheck=" +
isSkipBytesInMemoryOverheadCheck() +
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", maxTotalRows=" + getMaxTotalRows() +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
index 7cf49a3..56c9456 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
@@ -77,6 +77,7 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
null,
null,
null,
+ null,
null
);
}
@@ -85,6 +86,7 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
+ @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean
skipBytesInMemoryOverheadCheck,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") Period
intermediatePersistPeriod,
@@ -121,6 +123,7 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
@@ -218,6 +221,7 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
return "KinesisSupervisorTuningConfig{" +
"maxRowsInMemory=" + getMaxRowsInMemory() +
", maxBytesInMemory=" + getMaxBytesInMemory() +
+ ", skipBytesInMemoryOverheadCheck=" +
isSkipBytesInMemoryOverheadCheck() +
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", maxTotalRows=" + getMaxTotalRows() +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
@@ -255,6 +259,7 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
+ isSkipBytesInMemoryOverheadCheck(),
getMaxRowsPerSegment(),
getMaxTotalRows(),
getIntermediatePersistPeriod(),
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
index f6b3582..c279a18 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
@@ -73,6 +73,7 @@ public class KinesisIndexTaskSerdeTest
null,
null,
null,
+ null,
null
);
private static final KinesisIndexTaskIOConfig IO_CONFIG = new
KinesisIndexTaskIOConfig(
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 37d699a..f6a8542 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2758,6 +2758,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
null,
maxRowsInMemory,
null,
+ null,
maxRowsPerSegment,
maxTotalRows,
new Period("P1Y"),
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index 69a70a4..3600671 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -144,6 +144,7 @@ public class KinesisIndexTaskTuningConfigTest
null,
1,
3L,
+ null,
2,
100L,
new Period("PT3S"),
@@ -205,6 +206,7 @@ public class KinesisIndexTaskTuningConfigTest
null,
1,
3L,
+ null,
2,
100L,
new Period("PT3S"),
@@ -294,6 +296,7 @@ public class KinesisIndexTaskTuningConfigTest
null,
1,
(long) 3,
+ null,
2,
100L,
new Period("PT3S"),
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 77c30d4..3f35204 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -171,6 +171,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
1000,
null,
+ null,
50000,
null,
new Period("P1Y"),
@@ -3694,6 +3695,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
1000,
null,
+ null,
50000,
null,
new Period("P1Y"),
@@ -4747,6 +4749,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
1000,
null,
+ null,
50000,
null,
new Period("P1Y"),
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
index 5b2e2bd..9278c10 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
@@ -41,6 +41,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends
KinesisIndexTaskTu
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
+ @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean
skipBytesInMemoryOverheadCheck,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") Period
intermediatePersistPeriod,
@@ -71,6 +72,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends
KinesisIndexTaskTu
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
@@ -104,6 +106,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig
extends KinesisIndexTaskTu
base.getAppendableIndexSpec(),
base.getMaxRowsInMemory(),
base.getMaxBytesInMemory(),
+ base.isSkipBytesInMemoryOverheadCheck(),
base.getMaxRowsPerSegment(),
base.getMaxTotalRows(),
base.getIntermediatePersistPeriod(),
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
index 1538e15..172d8eb 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
@@ -56,6 +56,7 @@ public class RealtimeAppenderatorTuningConfig implements
AppenderatorConfig
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
+ private final boolean skipBytesInMemoryOverheadCheck;
private final DynamicPartitionsSpec partitionsSpec;
private final Period intermediatePersistPeriod;
private final File basePersistDirectory;
@@ -78,6 +79,7 @@ public class RealtimeAppenderatorTuningConfig implements
AppenderatorConfig
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
+ @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean
skipBytesInMemoryOverheadCheck,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") Period
intermediatePersistPeriod,
@@ -100,6 +102,8 @@ public class RealtimeAppenderatorTuningConfig implements
AppenderatorConfig
// initializing this to 0, it will be lazily initialized to a value
// @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
+ this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck ==
null ?
+
DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK : skipBytesInMemoryOverheadCheck;
this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment,
maxTotalRows);
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? DEFAULT_INTERMEDIATE_PERSIST_PERIOD
@@ -159,6 +163,13 @@ public class RealtimeAppenderatorTuningConfig implements
AppenderatorConfig
return maxBytesInMemory;
}
+ @JsonProperty
+ @Override
+ public boolean isSkipBytesInMemoryOverheadCheck()
+ {
+ return skipBytesInMemoryOverheadCheck;
+ }
+
@Override
@JsonProperty
public Integer getMaxRowsPerSegment()
@@ -273,6 +284,7 @@ public class RealtimeAppenderatorTuningConfig implements
AppenderatorConfig
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
partitionsSpec.getMaxRowsPerSegment(),
partitionsSpec.getMaxTotalRows(),
intermediatePersistPeriod,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 5cbaa35..530a400 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -222,6 +222,7 @@ public class CompactionTask extends AbstractBatchIndexTask
indexTuningConfig.getAppendableIndexSpec(),
indexTuningConfig.getMaxRowsPerSegment(),
indexTuningConfig.getMaxBytesInMemory(),
+ indexTuningConfig.isSkipBytesInMemoryOverheadCheck(),
indexTuningConfig.getMaxTotalRows(),
indexTuningConfig.getNumShards(),
null,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 78d4699..ce9da23 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -1123,6 +1123,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
+ private final boolean skipBytesInMemoryOverheadCheck;
private final int maxColumnsToMerge;
// null if all partitionsSpec related params are null. see
getDefaultPartitionsSpec() for details.
@@ -1196,6 +1197,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
+ @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean
skipBytesInMemoryOverheadCheck,
@JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
@JsonProperty("rowFlushBoundary") @Deprecated @Nullable Integer
rowFlushBoundary_forBackCompatibility,
@JsonProperty("numShards") @Deprecated @Nullable Integer numShards,
@@ -1220,6 +1222,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
appendableIndexSpec,
maxRowsInMemory != null ? maxRowsInMemory :
rowFlushBoundary_forBackCompatibility,
maxBytesInMemory != null ? maxBytesInMemory : 0,
+ skipBytesInMemoryOverheadCheck != null ?
skipBytesInMemoryOverheadCheck : DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK,
getPartitionsSpec(
forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP :
forceGuaranteedRollup,
partitionsSpec,
@@ -1250,13 +1253,14 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
private IndexTuningConfig()
{
- this(null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null);
+ this(null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null);
}
private IndexTuningConfig(
@Nullable AppendableIndexSpec appendableIndexSpec,
@Nullable Integer maxRowsInMemory,
@Nullable Long maxBytesInMemory,
+ @Nullable Boolean skipBytesInMemoryOverheadCheck,
@Nullable PartitionsSpec partitionsSpec,
@Nullable IndexSpec indexSpec,
@Nullable IndexSpec indexSpecForIntermediatePersists,
@@ -1277,6 +1281,8 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
// initializing this to 0, it will be lazily initialized to a value
// @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
+ this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck ==
null ?
+
DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK : skipBytesInMemoryOverheadCheck;
this.maxColumnsToMerge = maxColumnsToMerge == null
? IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE
: maxColumnsToMerge;
@@ -1317,6 +1323,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
partitionsSpec,
indexSpec,
indexSpecForIntermediatePersists,
@@ -1339,6 +1346,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
partitionsSpec,
indexSpec,
indexSpecForIntermediatePersists,
@@ -1377,6 +1385,13 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
}
@JsonProperty
+ @Override
+ public boolean isSkipBytesInMemoryOverheadCheck()
+ {
+ return skipBytesInMemoryOverheadCheck;
+ }
+
+ @JsonProperty
@Nullable
@Override
public PartitionsSpec getPartitionsSpec()
@@ -1549,6 +1564,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) &&
maxRowsInMemory == that.maxRowsInMemory &&
maxBytesInMemory == that.maxBytesInMemory &&
+ skipBytesInMemoryOverheadCheck ==
that.skipBytesInMemoryOverheadCheck &&
maxColumnsToMerge == that.maxColumnsToMerge &&
maxPendingPersists == that.maxPendingPersists &&
forceGuaranteedRollup == that.forceGuaranteedRollup &&
@@ -1571,6 +1587,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
maxColumnsToMerge,
partitionsSpec,
indexSpec,
@@ -1593,6 +1610,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
return "IndexTuningConfig{" +
"maxRowsInMemory=" + maxRowsInMemory +
", maxBytesInMemory=" + maxBytesInMemory +
+ ", skipBytesInMemoryOverheadCheck=" +
skipBytesInMemoryOverheadCheck +
", maxColumnsToMerge=" + maxColumnsToMerge +
", partitionsSpec=" + partitionsSpec +
", indexSpec=" + indexSpec +
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index c9989e9..038e788 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -976,6 +976,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
tuningConfig.getMaxBytesInMemory(),
+ tuningConfig.isSkipBytesInMemoryOverheadCheck(),
null,
null,
null,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
index 8e2cd8a..065504e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
@@ -100,6 +100,7 @@ public class ParallelIndexTuningConfig extends
IndexTuningConfig
null,
null,
null,
+ null,
null
);
}
@@ -111,6 +112,7 @@ public class ParallelIndexTuningConfig extends
IndexTuningConfig
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
+ @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean
skipBytesInMemoryOverheadCheck,
@JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
@JsonProperty("numShards") @Deprecated @Nullable Integer numShards,
@JsonProperty("splitHintSpec") @Nullable SplitHintSpec splitHintSpec,
@@ -142,6 +144,7 @@ public class ParallelIndexTuningConfig extends
IndexTuningConfig
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
maxTotalRows,
null,
numShards,
@@ -258,6 +261,7 @@ public class ParallelIndexTuningConfig extends
IndexTuningConfig
getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
+ isSkipBytesInMemoryOverheadCheck(),
null,
null,
getSplitHintSpec(),
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
index b56d8e5..bad2f79 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
@@ -41,6 +41,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements Appenderato
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
+ private final boolean skipBytesInMemoryOverheadCheck;
private final DynamicPartitionsSpec partitionsSpec;
private final Period intermediatePersistPeriod;
private final File basePersistDirectory;
@@ -64,6 +65,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements Appenderato
@Nullable AppendableIndexSpec appendableIndexSpec,
@Nullable Integer maxRowsInMemory,
@Nullable Long maxBytesInMemory,
+ @Nullable Boolean skipBytesInMemoryOverheadCheck,
@Nullable Integer maxRowsPerSegment,
@Nullable Long maxTotalRows,
@Nullable Period intermediatePersistPeriod,
@@ -93,6 +95,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements Appenderato
// initializing this to 0, it will be lazily initialized to a value
// @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
+ this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck ==
null ?
+
DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK : skipBytesInMemoryOverheadCheck;
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? defaults.getIntermediatePersistPeriod()
: intermediatePersistPeriod;
@@ -155,6 +159,13 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements Appenderato
return maxBytesInMemory;
}
+ @JsonProperty
+ @Override
+ public boolean isSkipBytesInMemoryOverheadCheck()
+ {
+ return skipBytesInMemoryOverheadCheck;
+ }
+
@Override
@JsonProperty
public Integer getMaxRowsPerSegment()
@@ -295,6 +306,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements Appenderato
return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) &&
maxRowsInMemory == that.maxRowsInMemory &&
maxBytesInMemory == that.maxBytesInMemory &&
+ skipBytesInMemoryOverheadCheck ==
that.skipBytesInMemoryOverheadCheck &&
maxPendingPersists == that.maxPendingPersists &&
reportParseExceptions == that.reportParseExceptions &&
handoffConditionTimeout == that.handoffConditionTimeout &&
@@ -319,6 +331,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements Appenderato
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
partitionsSpec,
intermediatePersistPeriod,
basePersistDirectory,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index bfe7f40..08ea6fa 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -1397,6 +1397,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
extends InitializedNullHand
null,
1000,
null,
+ null,
maxRowsPerSegment,
maxTotalRows,
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index a25ae37..ef3cf68 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -209,6 +209,7 @@ public class ClientCompactionTaskQuerySerdeTest
2000L,
null,
null,
+ null,
new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10),
new DynamicPartitionsSpec(100, 30000L),
new IndexSpec(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index b466f35..c3f30ec 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -280,6 +280,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
null,
null,
null,
+ null,
new HashedPartitionsSpec(null, 3, null),
null,
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 6eddb65..21b7f73 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -312,6 +312,7 @@ public class CompactionTaskTest
null,
null,
null,
+ null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
@@ -448,6 +449,7 @@ public class CompactionTaskTest
null,
null,
null,
+ null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
@@ -609,6 +611,7 @@ public class CompactionTaskTest
null,
null,
null,
+ null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
@@ -674,6 +677,7 @@ public class CompactionTaskTest
null,
500000,
1000000L,
+ null,
1000000L,
null,
null,
@@ -746,6 +750,7 @@ public class CompactionTaskTest
null,
null,
null,
+ null,
new HashedPartitionsSpec(null, 3, null),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
@@ -1133,6 +1138,7 @@ public class CompactionTaskTest
null,
500000,
1000000L,
+ null,
Long.MAX_VALUE,
null,
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java
index a65a7c6..52b6c72 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java
@@ -66,6 +66,7 @@ public class IndexTaskSerdeTest
null,
null,
null,
+ null,
new DynamicPartitionsSpec(1000, 2000L),
new IndexSpec(
new RoaringBitmapSerdeFactory(false),
@@ -101,6 +102,7 @@ public class IndexTaskSerdeTest
null,
null,
null,
+ null,
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim1", "dim2")),
new IndexSpec(
new RoaringBitmapSerdeFactory(false),
@@ -132,6 +134,7 @@ public class IndexTaskSerdeTest
null,
100,
2000L,
+ null,
3000L,
null,
null,
@@ -169,6 +172,7 @@ public class IndexTaskSerdeTest
2000L,
null,
null,
+ null,
10,
ImmutableList.of("dim1", "dim2"),
null,
@@ -208,6 +212,7 @@ public class IndexTaskSerdeTest
null,
null,
null,
+ null,
new DynamicPartitionsSpec(1000, 2000L),
new IndexSpec(
new RoaringBitmapSerdeFactory(false),
@@ -244,6 +249,7 @@ public class IndexTaskSerdeTest
null,
null,
null,
+ null,
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim1", "dim2")),
new IndexSpec(
new RoaringBitmapSerdeFactory(false),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 2baece1..a7ccd47 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -1124,6 +1124,7 @@ public class IndexTaskTest extends IngestionTestBase
null,
null,
null,
+ null,
new HashedPartitionsSpec(2, null, null),
INDEX_SPEC,
null,
@@ -1254,6 +1255,7 @@ public class IndexTaskTest extends IngestionTestBase
null,
null,
null,
+ null,
new DynamicPartitionsSpec(2, null),
INDEX_SPEC,
null,
@@ -1376,6 +1378,7 @@ public class IndexTaskTest extends IngestionTestBase
null,
null,
null,
+ null,
new HashedPartitionsSpec(2, null, null),
INDEX_SPEC,
null,
@@ -1818,6 +1821,7 @@ public class IndexTaskTest extends IngestionTestBase
null,
maxRowsInMemory,
maxBytesInMemory,
+ null,
maxTotalRows,
null,
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index 2038441..79c0099 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -833,6 +833,7 @@ public class RealtimeIndexTaskTest extends
InitializedNullHandlingTest
null,
1000,
null,
+ null,
new Period("P1Y"),
new Period("PT10M"),
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index 3facbb6..59bc833 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -253,6 +253,7 @@ public class TaskSerdeTest
10,
null,
null,
+ null,
9999,
null,
null,
@@ -338,6 +339,7 @@ public class TaskSerdeTest
null,
null,
null,
+ null,
new DynamicPartitionsSpec(10000, null),
indexSpec,
null,
@@ -403,6 +405,7 @@ public class TaskSerdeTest
null,
1,
10L,
+ null,
new Period("PT10M"),
null,
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 6d0585f..f3cd4f5 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -158,6 +158,7 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
null,
null,
null,
+ null,
2,
null,
null,
@@ -230,6 +231,7 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
null,
null,
null,
+ null,
new MaxSizeSplitHintSpec(null, 1),
partitionsSpec,
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
index ee231b5..162c86e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
@@ -179,6 +179,7 @@ public class ParallelIndexSupervisorTaskKillTest extends
AbstractParallelIndexSu
null,
null,
null,
+ null,
numTotalSubTasks,
null,
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index a493777..f4b1746 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -428,6 +428,7 @@ public class ParallelIndexSupervisorTaskResourceTest
extends AbstractParallelInd
null,
null,
null,
+ null,
NUM_SUB_TASKS,
null,
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
index 70c34f9..0d61e6c 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
@@ -253,6 +253,7 @@ public class ParallelIndexSupervisorTaskSerdeTest
null,
null,
null,
+ null,
partitionsSpec,
null,
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index 46663c6..b4ee013 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -201,6 +201,7 @@ public class ParallelIndexSupervisorTaskTest
null,
null,
null,
+ null,
new HashedPartitionsSpec(null, 10, null),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
index aacb6b0..e663022 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
@@ -161,6 +161,7 @@ class ParallelIndexTestingFactory
null,
3,
4L,
+ null,
5L,
6,
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java
index cf862ff..8434e2f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java
@@ -76,6 +76,7 @@ public class ParallelIndexTuningConfigTest
null,
null,
null,
+ null,
new DynamicPartitionsSpec(100, 100L),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
@@ -120,6 +121,7 @@ public class ParallelIndexTuningConfigTest
null,
null,
null,
+ null,
new DynamicPartitionsSpec(100, 100L),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
@@ -164,6 +166,7 @@ public class ParallelIndexTuningConfigTest
null,
null,
null,
+ null,
new DynamicPartitionsSpec(100, 100L),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
@@ -210,6 +213,7 @@ public class ParallelIndexTuningConfigTest
null,
null,
null,
+ null,
new DynamicPartitionsSpec(100, 100L),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
@@ -253,6 +257,7 @@ public class ParallelIndexTuningConfigTest
null,
null,
null,
+ null,
new HashedPartitionsSpec(null, 10, null),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
@@ -296,6 +301,7 @@ public class ParallelIndexTuningConfigTest
null,
null,
null,
+ null,
new SingleDimensionPartitionsSpec(100, null, "dim", false),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
@@ -339,6 +345,7 @@ public class ParallelIndexTuningConfigTest
null,
null,
null,
+ null,
new DynamicPartitionsSpec(100, null),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index d27c19b..42c24ee 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -353,6 +353,7 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
null,
null,
null,
+ null,
1,
null,
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index d48520a..8bb8e3e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -754,6 +754,7 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
indexSpec,
null,
3,
@@ -836,6 +837,7 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
indexSpec,
null,
3,
@@ -1263,6 +1265,7 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
indexSpec,
null,
null,
@@ -1372,6 +1375,7 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
indexSpec,
null,
3,
@@ -1479,6 +1483,7 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
null,
1000,
null,
+ null,
new Period("P1Y"),
null, //default window period of 10 minutes
null, // base persist dir ignored by Realtime Index task
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index d0b8107..6adf4c8 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -900,6 +900,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
null,
+ null,
null
)
{
diff --git a/integration-tests/docker/environment-configs/indexer
b/integration-tests/docker/environment-configs/indexer
index 906fe70..21d2519 100644
--- a/integration-tests/docker/environment-configs/indexer
+++ b/integration-tests/docker/environment-configs/indexer
@@ -21,7 +21,7 @@ DRUID_SERVICE=indexer
DRUID_LOG_PATH=/shared/logs/indexer.log
# JAVA OPTS
-SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:+UseG1GC
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5008
+SERVICE_DRUID_JAVA_OPTS=-server -Xmx1g -Xms512m -XX:+UseG1GC
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5008
# Druid configs
druid_host=druid-indexer
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
index 48fb592..e7b4525 100644
---
a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
+++
b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
@@ -68,6 +68,7 @@ public class RealtimeTuningConfig implements
AppenderatorConfig
DEFAULT_APPENDABLE_INDEX,
DEFAULT_MAX_ROWS_IN_MEMORY,
0L,
+ DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK,
DEFAULT_INTERMEDIATE_PERSIST_PERIOD,
DEFAULT_WINDOW_PERIOD,
basePersistDirectory == null ? createNewBasePersistDirectory() :
basePersistDirectory,
@@ -91,6 +92,7 @@ public class RealtimeTuningConfig implements
AppenderatorConfig
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
+ private final boolean skipBytesInMemoryOverheadCheck;
private final Period intermediatePersistPeriod;
private final Period windowPeriod;
private final File basePersistDirectory;
@@ -115,6 +117,7 @@ public class RealtimeTuningConfig implements
AppenderatorConfig
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
+ @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean
skipBytesInMemoryOverheadCheck,
@JsonProperty("intermediatePersistPeriod") Period
intermediatePersistPeriod,
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@@ -140,6 +143,8 @@ public class RealtimeTuningConfig implements
AppenderatorConfig
// initializing this to 0, it will be lazily initialized to a value
// @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
+ this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck ==
null ?
+
DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK : skipBytesInMemoryOverheadCheck;
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? DEFAULT_INTERMEDIATE_PERSIST_PERIOD
: intermediatePersistPeriod;
@@ -191,6 +196,13 @@ public class RealtimeTuningConfig implements
AppenderatorConfig
return maxBytesInMemory;
}
+ @JsonProperty
+ @Override
+ public boolean isSkipBytesInMemoryOverheadCheck()
+ {
+ return skipBytesInMemoryOverheadCheck;
+ }
+
@Override
@JsonProperty
public Period getIntermediatePersistPeriod()
@@ -318,6 +330,7 @@ public class RealtimeTuningConfig implements
AppenderatorConfig
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
intermediatePersistPeriod,
windowPeriod,
basePersistDirectory,
@@ -345,6 +358,7 @@ public class RealtimeTuningConfig implements
AppenderatorConfig
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
intermediatePersistPeriod,
windowPeriod,
dir,
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
b/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
index 760494e..156063e 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
@@ -39,6 +39,7 @@ public interface TuningConfig
int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
+ boolean DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK = false;
/**
* The incremental index implementation to use
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
index 137280e..9c59387 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
@@ -20,12 +20,14 @@
package org.apache.druid.segment.realtime;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
@@ -70,6 +72,26 @@ public class FireHydrant
return adapter.get().getId();
}
+ public int getSegmentNumDimensionColumns()
+ {
+ final Segment segment = adapter.get().getBaseSegment();
+ if (segment != null) {
+ final StorageAdapter storageAdapter = segment.asStorageAdapter();
+ return storageAdapter.getAvailableDimensions().size();
+ }
+ return 0;
+ }
+
+ public int getSegmentNumMetricColumns()
+ {
+ final Segment segment = adapter.get().getBaseSegment();
+ if (segment != null) {
+ final StorageAdapter storageAdapter = segment.asStorageAdapter();
+ return Iterables.size(storageAdapter.getAvailableMetrics());
+ }
+ return 0;
+ }
+
public Interval getSegmentDataInterval()
{
return adapter.get().getDataInterval();
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
index 685d6ec..fff3466 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
@@ -32,6 +32,8 @@ public interface AppenderatorConfig extends TuningConfig
int getMaxPendingPersists();
+ boolean isSkipBytesInMemoryOverheadCheck();
+
/**
* Maximum number of rows in a single segment before pushing to deep storage
*/
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 211a354..5d7d8e0 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -101,6 +101,15 @@ import java.util.stream.Collectors;
public class AppenderatorImpl implements Appenderator
{
+ // Rough estimate of memory footprint of a ColumnHolder based on actual heap
dumps
+ public static final int ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER = 1000;
+ public static final int ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER = 700;
+ public static final int ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER = 600;
+ // Rough estimate of memory footprint of empty Sink based on actual heap
dumps
+ public static final int ROUGH_OVERHEAD_PER_SINK = 5000;
+ // Rough estimate of memory footprint of empty FireHydrant based on actual
heap dumps
+ public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000;
+
private static final EmittingLogger log = new
EmittingLogger(AppenderatorImpl.class);
private static final int WARN_DELAY = 1000;
private static final String IDENTIFIER_FILE_NAME = "identifier.json";
@@ -125,6 +134,7 @@ public class AppenderatorImpl implements Appenderator
private final Set<SegmentIdWithShardSpec> droppingSinks =
Sets.newConcurrentHashSet();
private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
private final long maxBytesTuningConfig;
+ private final boolean skipBytesInMemoryOverheadCheck;
private final QuerySegmentWalker texasRanger;
// This variable updated in add(), persist(), and drop()
@@ -199,6 +209,7 @@ public class AppenderatorImpl implements Appenderator
}
maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
+ skipBytesInMemoryOverheadCheck =
tuningConfig.isSkipBytesInMemoryOverheadCheck();
}
@Override
@@ -408,6 +419,7 @@ public class AppenderatorImpl implements Appenderator
maxBytesTuningConfig,
null
);
+ bytesCurrentlyInMemory.addAndGet(calculateSinkMemoryInUsed(retVal));
try {
segmentAnnouncer.announceSegment(retVal.getSegment());
@@ -501,7 +513,7 @@ public class AppenderatorImpl implements Appenderator
public ListenableFuture<Object> persistAll(@Nullable final Committer
committer)
{
throwPersistErrorIfExists();
-
+ long bytesInMemoryBeforePersist = bytesCurrentlyInMemory.get();
final Map<String, Integer> currentHydrants = new HashMap<>();
final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist =
new ArrayList<>();
int numPersistedRows = 0;
@@ -527,7 +539,13 @@ public class AppenderatorImpl implements Appenderator
}
if (sink.swappable()) {
+ // After swapping the sink, we use memory mapped segment instead.
However, the memory mapped segment still consumes memory.
+ // These memory mapped segments are held in memory throughout the
ingestion phase and permanently add to the bytesCurrentlyInMemory
+ int memoryStillInUse =
calculateMMappedHydrantMemoryInUsed(sink.getCurrHydrant());
+ bytesCurrentlyInMemory.addAndGet(memoryStillInUse);
+
indexesToPersist.add(Pair.of(sink.swap(), identifier));
+
}
}
@@ -617,6 +635,36 @@ public class AppenderatorImpl implements Appenderator
// NB: The rows are still in memory until they're done persisting, but we
only count rows in active indexes.
rowsCurrentlyInMemory.addAndGet(-numPersistedRows);
bytesCurrentlyInMemory.addAndGet(-bytesPersisted);
+
+ log.info("Persisted rows[%,d] and bytes[%,d]", numPersistedRows,
bytesPersisted);
+
+ // bytesCurrentlyInMemory can change while persisting due to concurrent
ingestion.
+ // Hence, we use bytesInMemoryBeforePersist to determine the change of
this persist
+ if (!skipBytesInMemoryOverheadCheck && bytesInMemoryBeforePersist -
bytesPersisted > maxBytesTuningConfig) {
+ // We are still over maxBytesTuningConfig even after persisting.
+ // This means that we ran out of all available memory to ingest (due to
overheads created as part of ingestion)
+ final String alertMessage = StringUtils.format(
+ "Task has exceeded safe estimated heap usage limits, failing "
+ + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])",
+ sinks.size(),
+ sinks.values().stream().mapToInt(Iterables::size).sum(),
+ getTotalRowCount()
+ );
+ final String errorMessage = StringUtils.format(
+ "%s.\nThis can occur when the overhead from too many intermediary
segment persists becomes to "
+ + "great to have enough space to process additional input rows. This
check, along with metering the overhead "
+ + "of these objects to factor into the 'maxBytesInMemory'
computation, can be disabled by setting "
+ + "'skipBytesInMemoryOverheadCheck' to 'true' (note that doing so
might allow the task to naturally encounter "
+ + "a 'java.lang.OutOfMemoryError'). Alternatively,
'maxBytesInMemory' can be increased which will cause an "
+ + "increase in heap footprint, but will allow for more intermediary
segment persists to occur before "
+ + "reaching this condition.",
+ alertMessage
+ );
+ log.makeAlert(alertMessage)
+ .addData("dataSource", schema.getDataSource())
+ .emit();
+ throw new RuntimeException(errorMessage);
+ }
return future;
}
@@ -1173,6 +1221,13 @@ public class AppenderatorImpl implements Appenderator
// i.e. those that haven't been persisted for *InMemory counters, or
pushed to deep storage for the total counter.
rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory());
bytesCurrentlyInMemory.addAndGet(-sink.getBytesInMemory());
+ bytesCurrentlyInMemory.addAndGet(-calculateSinkMemoryInUsed(sink));
+ for (FireHydrant hydrant : sink) {
+ // Decrement memory used by all Memory Mapped Hydrant
+ if (!hydrant.equals(sink.getCurrHydrant())) {
+
bytesCurrentlyInMemory.addAndGet(-calculateMMappedHydrantMemoryInUsed(hydrant));
+ }
+ }
totalRows.addAndGet(-sink.getNumRows());
}
@@ -1382,4 +1437,27 @@ public class AppenderatorImpl implements Appenderator
}
}
}
+
+ private int calculateMMappedHydrantMemoryInUsed(FireHydrant hydrant)
+ {
+ if (skipBytesInMemoryOverheadCheck) {
+ return 0;
+ }
+ // These calculations are approximated from actual heap dumps.
+ // Memory footprint includes count integer in FireHydrant, shorts in
ReferenceCountingSegment,
+ // Objects in SimpleQueryableIndex (such as SmooshedFileMapper, each
ColumnHolder in column map, etc.)
+ return Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT +
+ (hydrant.getSegmentNumDimensionColumns() *
ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) +
+ (hydrant.getSegmentNumMetricColumns() *
ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
+ ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
+ }
+
+ private int calculateSinkMemoryInUsed(Sink sink)
+ {
+ if (skipBytesInMemoryOverheadCheck) {
+ return 0;
+ }
+ // Rough estimate of memory footprint of empty Sink based on actual heap
dumps
+ return ROUGH_OVERHEAD_PER_SINK;
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index d7609a3..00cef0d 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -413,6 +413,12 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
}
@Override
+ public boolean isSkipBytesInMemoryOverheadCheck()
+ {
+ return baseConfig.isSkipBytesInMemoryOverheadCheck();
+ }
+
+ @Override
public int getMaxPendingPersists()
{
return baseConfig.getMaxPendingPersists();
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
index 87df0e8..7cba86e 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
@@ -68,6 +68,7 @@ public class AppenderatorPlumberTest
null,
null,
null,
+ null,
new IntervalStartVersioningPolicy(),
new NoopRejectionPolicyFactory(),
null,
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
index 09ffb64..a6c74e6 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
@@ -154,9 +154,18 @@ public class AppenderatorTest extends
InitializedNullHandlingTest
}
@Test
- public void testMaxBytesInMemory() throws Exception
+ public void testMaxBytesInMemoryWithSkipBytesInMemoryOverheadCheckConfig()
throws Exception
{
- try (final AppenderatorTester tester = new AppenderatorTester(100, 1024,
true)) {
+ try (
+ final AppenderatorTester tester = new AppenderatorTester(
+ 100,
+ 1024,
+ null,
+ true,
+ new SimpleRowIngestionMeters(),
+ true
+ )
+ ) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
@@ -197,9 +206,18 @@ public class AppenderatorTest extends
InitializedNullHandlingTest
}
@Test
- public void testMaxBytesInMemoryInMultipleSinks() throws Exception
+ public void
testMaxBytesInMemoryInMultipleSinksWithSkipBytesInMemoryOverheadCheckConfig()
throws Exception
{
- try (final AppenderatorTester tester = new AppenderatorTester(100, 1024,
true)) {
+ try (
+ final AppenderatorTester tester = new AppenderatorTester(
+ 100,
+ 1024,
+ null,
+ true,
+ new SimpleRowIngestionMeters(),
+ true
+ )
+ ) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
@@ -237,6 +255,377 @@ public class AppenderatorTest extends
InitializedNullHandlingTest
}
@Test
+ public void testMaxBytesInMemory() throws Exception
+ {
+ try (final AppenderatorTester tester = new AppenderatorTester(100, 10000,
true)) {
+ final Appenderator appenderator = tester.getAppenderator();
+ final AtomicInteger eventCount = new AtomicInteger(0);
+ final Supplier<Committer> committerSupplier = () -> {
+ final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
+
+ return new Committer()
+ {
+ @Override
+ public Object getMetadata()
+ {
+ return metadata;
+ }
+
+ @Override
+ public void run()
+ {
+ //Do nothing
+ }
+ };
+ };
+
+ appenderator.startJob();
+ appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1),
committerSupplier);
+ // Still under maxSizeInBytes after the add. Hence, we do not persist yet
+ //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) +
56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is
enabled
+ int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
+ int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+ int sinkSizeOverhead = 1 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
+ // currHydrant in the sink still has > 0 bytesInMemory since we do not
persist yet
+ Assert.assertEquals(
+ currentInMemoryIndexSize,
+ ((AppenderatorImpl)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+ );
+ Assert.assertEquals(
+ currentInMemoryIndexSize + sinkSizeOverhead,
+ ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+ );
+
+ // We do multiple more adds to the same sink to cause persist.
+ for (int i = 0; i < 26; i++) {
+ appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1),
committerSupplier);
+ }
+ sinkSizeOverhead = 1 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
+ // currHydrant size is 0 since we just persist all indexes to disk.
+ currentInMemoryIndexSize = 0;
+ // We are now over maxSizeInBytes after the add. Hence, we do a persist.
+ // currHydrant in the sink has 0 bytesInMemory since we just did a
persist
+ Assert.assertEquals(
+ currentInMemoryIndexSize,
+ ((AppenderatorImpl)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+ );
+ // Mapped index size is the memory still needed after we persisted
indexes. Note that the segments have
+ // 1 dimension columns, 2 metric column, 1 time column.
+ int mappedIndexSize = 1012 + (2 *
AppenderatorImpl.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
+
AppenderatorImpl.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
+
AppenderatorImpl.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
+ Assert.assertEquals(
+ currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
+ ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+ );
+
+ // Add a single row after persisted
+ appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1),
committerSupplier);
+ // currHydrant in the sink still has > 0 bytesInMemory since we do not
persist yet
+ currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+ Assert.assertEquals(
+ currentInMemoryIndexSize,
+ ((AppenderatorImpl)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+ );
+ Assert.assertEquals(
+ currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
+ ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+ );
+
+ // We do multiple more adds to the same sink to cause persist.
+ for (int i = 0; i < 5; i++) {
+ appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1),
committerSupplier);
+ }
+ // currHydrant size is 0 since we just persist all indexes to disk.
+ currentInMemoryIndexSize = 0;
+ // We are now over maxSizeInBytes after the add. Hence, we do a persist.
+ // currHydrant in the sink has 0 bytesInMemory since we just did a
persist
+ Assert.assertEquals(
+ currentInMemoryIndexSize,
+ ((AppenderatorImpl)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+ );
+ // Mapped index size is the memory still needed after we persisted
indexes. Note that the segments have
+ // 1 dimension columns, 2 metric column, 1 time column. However, we have
two indexes now from the two pervious
+ // persists.
+ mappedIndexSize = 2 * (1012 + (2 *
AppenderatorImpl.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
+
AppenderatorImpl.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
+
AppenderatorImpl.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER);
+ Assert.assertEquals(
+ currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
+ ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+ );
+ appenderator.close();
+ Assert.assertEquals(0, ((AppenderatorImpl)
appenderator).getRowsInMemory());
+ Assert.assertEquals(0, ((AppenderatorImpl)
appenderator).getBytesCurrentlyInMemory());
+ }
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception
+ {
+ try (final AppenderatorTester tester = new AppenderatorTester(100, 10,
true)) {
+ final Appenderator appenderator = tester.getAppenderator();
+ final AtomicInteger eventCount = new AtomicInteger(0);
+ final Supplier<Committer> committerSupplier = () -> {
+ final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
+
+ return new Committer()
+ {
+ @Override
+ public Object getMetadata()
+ {
+ return metadata;
+ }
+
+ @Override
+ public void run()
+ {
+ //Do nothing
+ }
+ };
+ };
+
+ appenderator.startJob();
+ appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1),
committerSupplier);
+ }
+ }
+
+ @Test
+ public void
testTaskDoesNotFailAsExceededMemoryWithSkipBytesInMemoryOverheadCheckConfig()
throws Exception
+ {
+ try (
+ final AppenderatorTester tester = new AppenderatorTester(
+ 100,
+ 10,
+ null,
+ true,
+ new SimpleRowIngestionMeters(),
+ true
+ )
+ ) {
+ final Appenderator appenderator = tester.getAppenderator();
+ final AtomicInteger eventCount = new AtomicInteger(0);
+ final Supplier<Committer> committerSupplier = () -> {
+ final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
+
+ return new Committer()
+ {
+ @Override
+ public Object getMetadata()
+ {
+ return metadata;
+ }
+
+ @Override
+ public void run()
+ {
+ //Do nothing
+ }
+ };
+ };
+
+ appenderator.startJob();
+ appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1),
committerSupplier);
+ // Expected 0 since we persisted after the add
+ Assert.assertEquals(
+ 0,
+ ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+ );
+ appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1),
committerSupplier);
+ // Expected 0 since we persisted after the add
+ Assert.assertEquals(
+ 0,
+ ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+ );
+ }
+ }
+
+ @Test
+ public void testTaskCleanupInMemoryCounterAfterCloseWithRowInMemory() throws
Exception
+ {
+ try (final AppenderatorTester tester = new AppenderatorTester(100, 10000,
true)) {
+ final Appenderator appenderator = tester.getAppenderator();
+ final AtomicInteger eventCount = new AtomicInteger(0);
+ final Supplier<Committer> committerSupplier = () -> {
+ final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
+
+ return new Committer()
+ {
+ @Override
+ public Object getMetadata()
+ {
+ return metadata;
+ }
+
+ @Override
+ public void run()
+ {
+ //Do nothing
+ }
+ };
+ };
+
+ appenderator.startJob();
+ appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1),
committerSupplier);
+
+ // Still under maxSizeInBytes after the add. Hence, we do not persist yet
+ int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
+ int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+ int sinkSizeOverhead = 1 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
+ Assert.assertEquals(
+ currentInMemoryIndexSize + sinkSizeOverhead,
+ ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+ );
+
+ // Close with row still in memory (no persist)
+ appenderator.close();
+
+ Assert.assertEquals(0, ((AppenderatorImpl)
appenderator).getRowsInMemory());
+ Assert.assertEquals(0, ((AppenderatorImpl)
appenderator).getBytesCurrentlyInMemory());
+ }
+ }
+
+ @Test
+ public void testMaxBytesInMemoryInMultipleSinks() throws Exception
+ {
+ try (final AppenderatorTester tester = new AppenderatorTester(100, 31100,
true)) {
+ final Appenderator appenderator = tester.getAppenderator();
+ final AtomicInteger eventCount = new AtomicInteger(0);
+ final Supplier<Committer> committerSupplier = () -> {
+ final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
+
+ return new Committer()
+ {
+ @Override
+ public Object getMetadata()
+ {
+ return metadata;
+ }
+
+ @Override
+ public void run()
+ {
+ //Do nothing
+ }
+ };
+ };
+
+ appenderator.startJob();
+ appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1),
committerSupplier);
+ appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1),
committerSupplier);
+
+ // Still under maxSizeInBytes after the add. Hence, we do not persist yet
+ //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) +
56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is
enabled
+ int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
+ int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+ int sinkSizeOverhead = 2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
+ // currHydrant in the sink still has > 0 bytesInMemory since we do not
persist yet
+ Assert.assertEquals(
+ currentInMemoryIndexSize,
+ ((AppenderatorImpl)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+ );
+ Assert.assertEquals(
+ currentInMemoryIndexSize,
+ ((AppenderatorImpl)
appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+ );
+ Assert.assertEquals(
+ (2 * currentInMemoryIndexSize) + sinkSizeOverhead,
+ ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+ );
+
+ // We do multiple more adds to the same sink to cause persist.
+ for (int i = 0; i < 49; i++) {
+ appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1),
committerSupplier);
+ appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar_" + i, 1),
committerSupplier);
+ }
+ sinkSizeOverhead = 2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
+ // currHydrant size is 0 since we just persist all indexes to disk.
+ currentInMemoryIndexSize = 0;
+ // We are now over maxSizeInBytes after the add. Hence, we do a persist.
+ // currHydrant in the sink has 0 bytesInMemory since we just did a
persist
+ Assert.assertEquals(
+ currentInMemoryIndexSize,
+ ((AppenderatorImpl)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+ );
+ Assert.assertEquals(
+ currentInMemoryIndexSize,
+ ((AppenderatorImpl)
appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+ );
+ // Mapped index size is the memory still needed after we persisted
indexes. Note that the segments have
+ // 1 dimension columns, 2 metric column, 1 time column.
+ int mappedIndexSize = 2 * (1012 + (2 *
AppenderatorImpl.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
+
AppenderatorImpl.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
+
AppenderatorImpl.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER);
+ Assert.assertEquals(
+ currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
+ ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+ );
+
+ // Add a single row after persisted to sink 0
+ appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1),
committerSupplier);
+ // currHydrant in the sink still has > 0 bytesInMemory since we do not
persist yet
+ currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+ Assert.assertEquals(
+ currentInMemoryIndexSize,
+ ((AppenderatorImpl)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+ );
+ Assert.assertEquals(
+ 0,
+ ((AppenderatorImpl)
appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+ );
+ Assert.assertEquals(
+ currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
+ ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+ );
+ // Now add a single row to sink 1
+ appenderator.add(IDENTIFIERS.get(1), ir("2000", "bob", 1),
committerSupplier);
+ Assert.assertEquals(
+ currentInMemoryIndexSize,
+ ((AppenderatorImpl)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+ );
+ Assert.assertEquals(
+ currentInMemoryIndexSize,
+ ((AppenderatorImpl)
appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+ );
+ Assert.assertEquals(
+ (2 * currentInMemoryIndexSize) + sinkSizeOverhead + mappedIndexSize,
+ ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+ );
+
+ // We do multiple more adds to the both sink to cause persist.
+ for (int i = 0; i < 34; i++) {
+ appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1),
committerSupplier);
+ appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar_" + i, 1),
committerSupplier);
+ }
+ // currHydrant size is 0 since we just persist all indexes to disk.
+ currentInMemoryIndexSize = 0;
+ // We are now over maxSizeInBytes after the add. Hence, we do a persist.
+ // currHydrant in the sink has 0 bytesInMemory since we just did a
persist
+ Assert.assertEquals(
+ currentInMemoryIndexSize,
+ ((AppenderatorImpl)
appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+ );
+ Assert.assertEquals(
+ currentInMemoryIndexSize,
+ ((AppenderatorImpl)
appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+ );
+ // Mapped index size is the memory still needed after we persisted
indexes. Note that the segments have
+ // 1 dimension columns, 2 metric column, 1 time column. However, we have
two indexes now from the two pervious
+ // persists.
+ mappedIndexSize = 2 * (2 * (1012 + (2 *
AppenderatorImpl.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
+
AppenderatorImpl.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
+
AppenderatorImpl.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER));
+ Assert.assertEquals(
+ currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
+ ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+ );
+ appenderator.close();
+ Assert.assertEquals(0, ((AppenderatorImpl)
appenderator).getRowsInMemory());
+ Assert.assertEquals(0, ((AppenderatorImpl)
appenderator).getBytesCurrentlyInMemory());
+ }
+ }
+
+ @Test
public void testIgnoreMaxBytesInMemory() throws Exception
{
try (final AppenderatorTester tester = new AppenderatorTester(100, -1,
true)) {
@@ -273,8 +662,9 @@ public class AppenderatorTest extends
InitializedNullHandlingTest
);
Assert.assertEquals(1, ((AppenderatorImpl)
appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1),
committerSupplier);
+ int sinkSizeOverhead = 2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
Assert.assertEquals(
- 364 + 2 * nullHandlingOverhead,
+ (364 + 2 * nullHandlingOverhead) + sinkSizeOverhead,
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
);
Assert.assertEquals(2, ((AppenderatorImpl)
appenderator).getRowsInMemory());
@@ -486,7 +876,7 @@ public class AppenderatorTest extends
InitializedNullHandlingTest
public void testVerifyRowIngestionMetrics() throws Exception
{
final RowIngestionMeters rowIngestionMeters = new
SimpleRowIngestionMeters();
- try (final AppenderatorTester tester = new AppenderatorTester(5, 1000L,
null, false, rowIngestionMeters)) {
+ try (final AppenderatorTester tester = new AppenderatorTester(5, 10000L,
null, false, rowIngestionMeters)) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", "invalid_met"),
Committers.nilSupplier());
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
index c77d429..1231b77 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
@@ -124,7 +124,7 @@ public class AppenderatorTester implements AutoCloseable
final boolean enablePushFailure
)
{
- this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory,
enablePushFailure, new SimpleRowIngestionMeters());
+ this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory,
enablePushFailure, new SimpleRowIngestionMeters(), false);
}
public AppenderatorTester(
@@ -135,6 +135,18 @@ public class AppenderatorTester implements AutoCloseable
final RowIngestionMeters rowIngestionMeters
)
{
+ this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory,
enablePushFailure, rowIngestionMeters, false);
+ }
+
+ public AppenderatorTester(
+ final int maxRowsInMemory,
+ final long maxSizeInBytes,
+ final File basePersistDirectory,
+ final boolean enablePushFailure,
+ final RowIngestionMeters rowIngestionMeters,
+ final boolean skipBytesInMemoryOverheadCheck
+ )
+ {
objectMapper = new DefaultObjectMapper();
objectMapper.registerSubtypes(LinearShardSpec.class);
@@ -165,6 +177,7 @@ public class AppenderatorTester implements AutoCloseable
null,
maxRowsInMemory,
maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
+ skipBytesInMemoryOverheadCheck,
null,
null,
basePersistDirectory,
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
index 6e38c1c..491f4b2 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
@@ -139,6 +139,7 @@ public class DefaultOfflineAppenderatorFactoryTest
null,
null,
null,
+ null,
temporaryFolder.newFolder(),
null,
null,
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
index 9b73cbe..deb80b3 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -205,6 +205,7 @@ public class RealtimePlumberSchoolTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
new IntervalStartVersioningPolicy(),
rejectionPolicy,
null,
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
index 477c3bd..9c79251 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
@@ -68,6 +68,7 @@ public class SinkTest extends InitializedNullHandlingTest
null,
100,
null,
+ null,
new Period("P1Y"),
null,
null,
@@ -224,6 +225,7 @@ public class SinkTest extends InitializedNullHandlingTest
null,
100,
null,
+ null,
new Period("P1Y"),
null,
null,
diff --git
a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java
b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java
index 8dec453..391d47f 100644
---
a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java
+++
b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java
@@ -159,6 +159,7 @@ public class DruidJsonValidatorTest
null,
1,
null,
+ null,
new Period("PT10M"),
null,
null,
diff --git a/website/.spelling b/website/.spelling
index 4184c67..6aa8025 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1797,6 +1797,7 @@ expr
jackson-jq
missingValue
schemaless
+skipBytesInMemoryOverheadCheck
spatialDimensions
useFieldDiscovery
- ../docs/tutorials/index.md
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]