jingz-db commented on code in PR #48000: URL: https://github.com/apache/spark/pull/48000#discussion_r1746141061
########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala: ########## @@ -82,6 +86,112 @@ object SchemaUtil { row } + /** + * For map state variables, state rows are stored as composite key. + * To return grouping key -> Map{user key -> value} as one state reader row to + * the users, we need to perform grouping on state rows by their grouping key, + * and construct a map for that grouping key. + * + * We traverse the iterator returned from state store, + * and will only return a row for `next()` only if the grouping key in the next row + * from state store is different (or there are no more rows) + * + * Note that as state rows in RocksDB are ordered, all state rows with the + * same grouping key will appear consecutively during the iterator traversal. + */ + def unifyMapStateRowPair( + stateRows: Iterator[UnsafeRowPair], + compositeKeySchema: StructType, + partitionId: Int): Iterator[InternalRow] = { + val groupingKeySchema = SchemaUtil.getSchemaAsDataType( + compositeKeySchema, "key" + ).asInstanceOf[StructType] + val userKeySchema = SchemaUtil.getSchemaAsDataType( + compositeKeySchema, "userKey" + ).asInstanceOf[StructType] + + def appendKVPairToMap( + curMap: mutable.Map[Any, Any], + stateRowPair: UnsafeRowPair): Unit = { + curMap += ( + stateRowPair.key.get(1, userKeySchema) + .asInstanceOf[UnsafeRow].copy() -> + stateRowPair.value.copy() + ) + } + + def updateDataRow( + groupingKey: Any, + curMap: mutable.Map[Any, Any]): GenericInternalRow = { + val row = new GenericInternalRow(3) + val mapData = new ArrayBasedMapData( + ArrayData.toArrayData(curMap.keys.toArray), Review Comment: Yeah, the mapping is based on index. The constructor will do something like this: ``` def apply(...): ArrayBasedMapData = { ... while (iterator.hasNext) { val entry = iterator.next() keys(i) = keyConverter(entry.getKey) values(i) = valueConverter(entry.getValue) i += 1 } ... } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org