bigbytest opened a new issue, #12945:
URL: https://github.com/apache/druid/issues/12945

   
   ### Affected Version
   
   The Druid version where the problem was encountered: 0.21.x,0.22.x,0.23.x
   
   ### Description
   
   When we query the variance of a field, the query result shows an unknown 
exception. The specific information is as follows:
   ```
   Error: Unknown exception
   
   java.util.concurrent.ExecutionException: java.lang.ClassCastException: 
[Ljava.lang.Object; cannot be cast to 
[Lorg.apache.druid.query.aggregation.variance.VarianceAggregatorCollector;
   
   java.lang.RuntimeException
   ```
   
   <img width="1146" alt="image" 
src="https://user-images.githubusercontent.com/51145107/186090716-150615f5-350c-4bc5-9658-ebd763bfeea4.png";>
   
   The following is used to demonstrate how to reproduce the issue. in the 
first place, we load some data into Apache Druid from a file using Apache 
Druid's native batch ingestion feature. the loading file spec like this:
   ```json
   {
       "type":"index_parallel",
       "spec":{
           "dataSchema":{
               "dataSource":"sample_data",
               "dimensionsSpec":{
                   "dimensions":[
                   ]
               },
               "timestampSpec":{
                   "column":"timestamp",
                   "format":"millis"
               },
               "metricsSpec":[
                   {
                       "type":"longSum",
                       "name":"totalCount",
                       "fieldName":"count"
                   },
                   {
                       "type":"doubleSum",
                       "name":"valueSum",
                       "fieldName":"value"
                   },
                   {
                       "name": "resp_time_var",
                       "fieldName": "value",
                       "type": "varianceFold",
                       "inputType": "long"
                   }
               ],
               "granularitySpec":{
                   "type":"uniform",
                   "segmentGranularity":"hour",
                   "queryGranularity":"minute",
                   "intervals":[
                       "2022-08-01/2022-08-23"
                   ],
                   "rollup":false
               }
           },
           "ioConfig":{
               "type":"index_parallel",
               "inputSource":{
                   "type":"local",
                   "baseDir":"/Users/junges/Downloads/",
                   "filter":"sample_data.json"
               },
               "inputFormat":{
                   "type":"json"
               },
               "appendToExisting":false
           },
           "tuningConfig":{
               "type":"index_parallel",
               "maxRowsPerSegment":1000000,
               "maxRowsInMemory":10000,
               "maxNumConcurrentSubTasks":1,
               "forceGuaranteedRollup":true,
               "partitionsSpec":{
                   "type":"single_dim",
                   "partitionDimension":"metric_key",
                   "maxRowsPerSegment":1000000
               }
           }
       }
   }
   ```
   the sample data like this:
   ```json
   
{"timestamp":1659286947472,"metric_key":"resp_time","state":"安徽省","city":"亳州","zipCode":"928061","country":"China","value":304,"count":38}
   ```
   When data is ingested, variance calculation will be performed on the 
`resp_time_var` field. 
   druid query statement:
   ```json
   {
       "queryType": "groupBy",
       "dataSource": {
           "type": "table",
           "name": "sample_data"
       },
       "intervals": {
           "type": "LegacySegmentSpec",
           "intervals": [
               "2022-08-01T00:00:00.000Z/2022-08-10T00:00:00.000Z"
           ]
       }
       "granularity": {
           "type": "all"
       },
       "dimensions": [
           {
               "type": "default",
               "dimension": "country",
               "outputName": "country",
               "outputType": "STRING"
           }
       ],
       "aggregations": [
           {
               "type": "varianceFold",
               "name": "respTimeVar",
               "fieldName": "resp_time_var",
               "estimator": null,
               "inputType": "variance"
           }
       ]
   }
   ```
   
   ### Proposed solution
   The following abnormal information was found by querying `history.log`.
   ```java
   
{"itvl":"2022-08-09T23:00:00.000Z/2022-08-10T00:00:00.000Z","ver":"2022-08-23T03:18:53.232Z","part":0}]},"virtualColumns":[],"filter":null,"granularity":{"type":"all"},"dimensions":[{"type":"default","dimension":"country","outputName":"country","outputType":"STRING"},{"type":"default","dimension":"state","outputName":"state","outputType":"STRING"}],"aggregations":[{"type":"longSum","name":"valueSum","fieldName":"valueSum","expression":null},{"type":"varianceFold","name":"respTimeVar","fieldName":"resp_time_var","estimator":null,"inputType":"variance"}],"postAggregations":[],"having":null,"limitSpec":{"type":"NoopLimitSpec"},"context":{"applyLimitPushDown":false,"defaultTimeout":300000,"finalize":false,"fudgeTimestamp":"1659312000000","groupByOutermost":false,"groupByStrategy":"v2","maxQueuedBytes":5242880,"maxScatterGatherBytes":9223372036854775807,"queryFailTime":1661233899391,"queryId":"957339fa-1d52-427e-ae1a-f50fd8ca55a0","resultAsArray":true,"timeout":299983},"descending":fal
 se}, peer=127.0.0.1}
   java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to 
[Lorg.apache.druid.query.aggregation.variance.VarianceAggregatorCollector;
        at 
org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2.waitForFutureCompletion(GroupByMergingQueryRunnerV2.java:385)
 ~[druid-processing-0.23.0.jar:0.23.0]
   ...
   Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast 
to [Lorg.apache.druid.query.aggregation.variance.VarianceAggregatorCollector;
        at 
org.apache.druid.query.aggregation.variance.VarianceObjectVectorAggregator.aggregate(VarianceObjectVectorAggregator.java:70)
 ~[?:?]
        at 
org.apache.druid.query.aggregation.AggregatorAdapters.aggregateVector(AggregatorAdapters.java:200)
 ~[druid-processing-0.23.0.jar:0.23.0]
        at 
org.apache.druid.query.groupby.epinephelinae.HashVectorGrouper.doAggregateVector(HashVectorGrouper.java:301)
 ~[druid-processing-0.23.0.jar:0.23.0]
        at 
org.apache.druid.query.groupby.epinephelinae.HashVectorGrouper.aggregateVector(HashVectorGrouper.java:184)
 ~[druid-processing-0.23.0.jar:0.23.0]
        at 
org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine$VectorGroupByEngineIterator.initNewDelegate(VectorGroupByEngine.java:409)
 ~[druid-processing-0.23.0.jar:0.23.0]
        at 
org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine$VectorGroupByEngineIterator.hasNext(VectorGroupByEngine.java:316)
 ~[druid-processing-0.23.0.jar:0.23.0]
        at 
org.apache.druid.java.util.common.guava.BaseSequence.accumulate(BaseSequence.java:43)
 ~[druid-core-0.23.0.jar:0.23.0]
   ```
   Through the above exception information, it can basically be determined that 
it is caused by this code: 
    
[org.apache.druid.query.aggregation.variance.VarianceObjectVectorAggregator](https://github.com/apache/druid/blob/master/extensions-core/stats/src/main/java/org/apache/druid/query/aggregation/variance/VarianceObjectVectorAggregator.java)
   ```java
     @Override
     public void aggregate(ByteBuffer buf, int position, int startRow, int 
endRow)
     {
       # problem code
       VarianceAggregatorCollector[] vector = (VarianceAggregatorCollector[]) 
selector.getObjectVector();
       VarianceAggregatorCollector previous = 
VarianceBufferAggregator.getVarianceCollector(buf, position);
       for (int i = startRow; i < endRow; i++) {
         previous.fold(vector[i]);
       }
       VarianceBufferAggregator.writeNVariance(buf, position, previous.count, 
previous.sum, previous.nvariance);
     }
   
     @Override
     public void aggregate(
         ByteBuffer buf,
         int numRows,
         int[] positions,
         @Nullable int[] rows,
         int positionOffset
     )
     {
      # problem code
       VarianceAggregatorCollector[] vector = (VarianceAggregatorCollector[]) 
selector.getObjectVector();
       for (int i = 0; i < numRows; i++) {
         int position = positions[i] + positionOffset;
         int row = rows != null ? rows[i] : i;
         VarianceAggregatorCollector previous = 
VarianceBufferAggregator.getVarianceCollector(buf, position);
         previous.fold(vector[row]);
         VarianceBufferAggregator.writeNVariance(buf, position, previous.count, 
previous.sum, previous.nvariance);
       }
     }
   ```
   Then I read the code and confirmed that the type conversion exception was 
caused by the returned object array type.
   
[org.apache.druid.segment.column.ComplexColumn](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/column/ComplexColumn.java)
   ```java
     default VectorObjectSelector makeVectorObjectSelector(ReadableVectorOffset 
offset)
     {
       return new VectorObjectSelector()
       {
         final Object[] vector = new Object[offset.getMaxVectorSize()];
   
         private int id = ReadableVectorInspector.NULL_ID;
   
         @Override
         public Object[] getObjectVector()
         {
          ...
           return vector;
         }
        ...
       };
     }
   ```
   So I modified this code of  
[org.apache.druid.query.aggregation.variance.VarianceObjectVectorAggregator](https://github.com/apache/druid/blob/master/extensions-core/stats/src/main/java/org/apache/druid/query/aggregation/variance/VarianceObjectVectorAggregator.java)
 and it solved the problem:
   ```java
     @Override
     public void aggregate(ByteBuffer buf, int position, int startRow, int 
endRow)
     {
       VarianceAggregatorCollector current;
       VarianceAggregatorCollector previous = 
VarianceBufferAggregator.getVarianceCollector(buf, position);
       for (int i = startRow; i < endRow; i++) {
         # solution code
         current = (VarianceAggregatorCollector) selector.getObjectVector()[i];
         previous.fold(current);
       }
       VarianceBufferAggregator.writeNVariance(buf, position, previous.count, 
previous.sum, previous.nvariance);
     }
   
     @Override
     public void aggregate(
         ByteBuffer buf,
         int numRows,
         int[] positions,
         @Nullable int[] rows,
         int positionOffset
     )
     {
       VarianceAggregatorCollector current;
       for (int i = 0; i < numRows; i++) {
         int position = positions[i] + positionOffset;
         int row = rows != null ? rows[i] : i;
         VarianceAggregatorCollector previous = 
VarianceBufferAggregator.getVarianceCollector(buf, position);
         # solution code
         current = (VarianceAggregatorCollector) 
selector.getObjectVector()[row];
         previous.fold(current);
         VarianceBufferAggregator.writeNVariance(buf, position, previous.count, 
previous.sum, previous.nvariance);
       }
     }
   ```
   <img width="868" alt="image" 
src="https://user-images.githubusercontent.com/51145107/186120346-823b035f-cf0c-4f7f-834f-36b163d45f26.png";>
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to