bigbytest opened a new issue, #12945: URL: https://github.com/apache/druid/issues/12945
### Affected Version The Druid version where the problem was encountered: 0.21.x,0.22.x,0.23.x ### Description When we query the variance of a field, the query result shows an unknown exception. The specific information is as follows: ``` Error: Unknown exception java.util.concurrent.ExecutionException: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.druid.query.aggregation.variance.VarianceAggregatorCollector; java.lang.RuntimeException ``` <img width="1146" alt="image" src="https://user-images.githubusercontent.com/51145107/186090716-150615f5-350c-4bc5-9658-ebd763bfeea4.png"> The following is used to demonstrate how to reproduce the issue. in the first place, we load some data into Apache Druid from a file using Apache Druid's native batch ingestion feature. the loading file spec like this: ```json { "type":"index_parallel", "spec":{ "dataSchema":{ "dataSource":"sample_data", "dimensionsSpec":{ "dimensions":[ ] }, "timestampSpec":{ "column":"timestamp", "format":"millis" }, "metricsSpec":[ { "type":"longSum", "name":"totalCount", "fieldName":"count" }, { "type":"doubleSum", "name":"valueSum", "fieldName":"value" }, { "name": "resp_time_var", "fieldName": "value", "type": "varianceFold", "inputType": "long" } ], "granularitySpec":{ "type":"uniform", "segmentGranularity":"hour", "queryGranularity":"minute", "intervals":[ "2022-08-01/2022-08-23" ], "rollup":false } }, "ioConfig":{ "type":"index_parallel", "inputSource":{ "type":"local", "baseDir":"/Users/junges/Downloads/", "filter":"sample_data.json" }, "inputFormat":{ "type":"json" }, "appendToExisting":false }, "tuningConfig":{ "type":"index_parallel", "maxRowsPerSegment":1000000, "maxRowsInMemory":10000, "maxNumConcurrentSubTasks":1, "forceGuaranteedRollup":true, "partitionsSpec":{ "type":"single_dim", "partitionDimension":"metric_key", "maxRowsPerSegment":1000000 } } } } ``` the sample data like this: ```json {"timestamp":1659286947472,"metric_key":"resp_time","state":"安徽省","city":"亳州","zipCode":"928061","country":"China","value":304,"count":38} ``` When data is ingested, variance calculation will be performed on the `resp_time_var` field. druid query statement: ```json { "queryType": "groupBy", "dataSource": { "type": "table", "name": "sample_data" }, "intervals": { "type": "LegacySegmentSpec", "intervals": [ "2022-08-01T00:00:00.000Z/2022-08-10T00:00:00.000Z" ] } "granularity": { "type": "all" }, "dimensions": [ { "type": "default", "dimension": "country", "outputName": "country", "outputType": "STRING" } ], "aggregations": [ { "type": "varianceFold", "name": "respTimeVar", "fieldName": "resp_time_var", "estimator": null, "inputType": "variance" } ] } ``` ### Proposed solution The following abnormal information was found by querying `history.log`. ```java {"itvl":"2022-08-09T23:00:00.000Z/2022-08-10T00:00:00.000Z","ver":"2022-08-23T03:18:53.232Z","part":0}]},"virtualColumns":[],"filter":null,"granularity":{"type":"all"},"dimensions":[{"type":"default","dimension":"country","outputName":"country","outputType":"STRING"},{"type":"default","dimension":"state","outputName":"state","outputType":"STRING"}],"aggregations":[{"type":"longSum","name":"valueSum","fieldName":"valueSum","expression":null},{"type":"varianceFold","name":"respTimeVar","fieldName":"resp_time_var","estimator":null,"inputType":"variance"}],"postAggregations":[],"having":null,"limitSpec":{"type":"NoopLimitSpec"},"context":{"applyLimitPushDown":false,"defaultTimeout":300000,"finalize":false,"fudgeTimestamp":"1659312000000","groupByOutermost":false,"groupByStrategy":"v2","maxQueuedBytes":5242880,"maxScatterGatherBytes":9223372036854775807,"queryFailTime":1661233899391,"queryId":"957339fa-1d52-427e-ae1a-f50fd8ca55a0","resultAsArray":true,"timeout":299983},"descending":fal se}, peer=127.0.0.1} java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.druid.query.aggregation.variance.VarianceAggregatorCollector; at org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2.waitForFutureCompletion(GroupByMergingQueryRunnerV2.java:385) ~[druid-processing-0.23.0.jar:0.23.0] ... Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.druid.query.aggregation.variance.VarianceAggregatorCollector; at org.apache.druid.query.aggregation.variance.VarianceObjectVectorAggregator.aggregate(VarianceObjectVectorAggregator.java:70) ~[?:?] at org.apache.druid.query.aggregation.AggregatorAdapters.aggregateVector(AggregatorAdapters.java:200) ~[druid-processing-0.23.0.jar:0.23.0] at org.apache.druid.query.groupby.epinephelinae.HashVectorGrouper.doAggregateVector(HashVectorGrouper.java:301) ~[druid-processing-0.23.0.jar:0.23.0] at org.apache.druid.query.groupby.epinephelinae.HashVectorGrouper.aggregateVector(HashVectorGrouper.java:184) ~[druid-processing-0.23.0.jar:0.23.0] at org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine$VectorGroupByEngineIterator.initNewDelegate(VectorGroupByEngine.java:409) ~[druid-processing-0.23.0.jar:0.23.0] at org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine$VectorGroupByEngineIterator.hasNext(VectorGroupByEngine.java:316) ~[druid-processing-0.23.0.jar:0.23.0] at org.apache.druid.java.util.common.guava.BaseSequence.accumulate(BaseSequence.java:43) ~[druid-core-0.23.0.jar:0.23.0] ``` Through the above exception information, it can basically be determined that it is caused by this code: [org.apache.druid.query.aggregation.variance.VarianceObjectVectorAggregator](https://github.com/apache/druid/blob/master/extensions-core/stats/src/main/java/org/apache/druid/query/aggregation/variance/VarianceObjectVectorAggregator.java) ```java @Override public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) { # problem code VarianceAggregatorCollector[] vector = (VarianceAggregatorCollector[]) selector.getObjectVector(); VarianceAggregatorCollector previous = VarianceBufferAggregator.getVarianceCollector(buf, position); for (int i = startRow; i < endRow; i++) { previous.fold(vector[i]); } VarianceBufferAggregator.writeNVariance(buf, position, previous.count, previous.sum, previous.nvariance); } @Override public void aggregate( ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset ) { # problem code VarianceAggregatorCollector[] vector = (VarianceAggregatorCollector[]) selector.getObjectVector(); for (int i = 0; i < numRows; i++) { int position = positions[i] + positionOffset; int row = rows != null ? rows[i] : i; VarianceAggregatorCollector previous = VarianceBufferAggregator.getVarianceCollector(buf, position); previous.fold(vector[row]); VarianceBufferAggregator.writeNVariance(buf, position, previous.count, previous.sum, previous.nvariance); } } ``` Then I read the code and confirmed that the type conversion exception was caused by the returned object array type. [org.apache.druid.segment.column.ComplexColumn](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/column/ComplexColumn.java) ```java default VectorObjectSelector makeVectorObjectSelector(ReadableVectorOffset offset) { return new VectorObjectSelector() { final Object[] vector = new Object[offset.getMaxVectorSize()]; private int id = ReadableVectorInspector.NULL_ID; @Override public Object[] getObjectVector() { ... return vector; } ... }; } ``` So I modified this code of [org.apache.druid.query.aggregation.variance.VarianceObjectVectorAggregator](https://github.com/apache/druid/blob/master/extensions-core/stats/src/main/java/org/apache/druid/query/aggregation/variance/VarianceObjectVectorAggregator.java) and it solved the problem: ```java @Override public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) { VarianceAggregatorCollector current; VarianceAggregatorCollector previous = VarianceBufferAggregator.getVarianceCollector(buf, position); for (int i = startRow; i < endRow; i++) { # solution code current = (VarianceAggregatorCollector) selector.getObjectVector()[i]; previous.fold(current); } VarianceBufferAggregator.writeNVariance(buf, position, previous.count, previous.sum, previous.nvariance); } @Override public void aggregate( ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset ) { VarianceAggregatorCollector current; for (int i = 0; i < numRows; i++) { int position = positions[i] + positionOffset; int row = rows != null ? rows[i] : i; VarianceAggregatorCollector previous = VarianceBufferAggregator.getVarianceCollector(buf, position); # solution code current = (VarianceAggregatorCollector) selector.getObjectVector()[row]; previous.fold(current); VarianceBufferAggregator.writeNVariance(buf, position, previous.count, previous.sum, previous.nvariance); } } ``` <img width="868" alt="image" src="https://user-images.githubusercontent.com/51145107/186120346-823b035f-cf0c-4f7f-834f-36b163d45f26.png"> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
