jingz-db commented on code in PR #48000:
URL: https://github.com/apache/spark/pull/48000#discussion_r1746141061


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala:
##########
@@ -82,6 +86,112 @@ object SchemaUtil {
     row
   }
 
+  /**
+   * For map state variables, state rows are stored as composite key.
+   * To return grouping key -> Map{user key -> value} as one state reader row 
to
+   * the users, we need to perform grouping on state rows by their grouping 
key,
+   * and construct a map for that grouping key.
+   *
+   * We traverse the iterator returned from state store,
+   * and will only return a row for `next()` only if the grouping key in the 
next row
+   * from state store is different (or there are no more rows)
+   *
+   * Note that as state rows in RocksDB are ordered, all state rows with the
+   * same grouping key will appear consecutively during the iterator traversal.
+   */
+  def unifyMapStateRowPair(
+      stateRows: Iterator[UnsafeRowPair],
+      compositeKeySchema: StructType,
+      partitionId: Int): Iterator[InternalRow] = {
+    val groupingKeySchema = SchemaUtil.getSchemaAsDataType(
+      compositeKeySchema, "key"
+    ).asInstanceOf[StructType]
+    val userKeySchema = SchemaUtil.getSchemaAsDataType(
+      compositeKeySchema, "userKey"
+    ).asInstanceOf[StructType]
+
+    def appendKVPairToMap(
+        curMap: mutable.Map[Any, Any],
+        stateRowPair: UnsafeRowPair): Unit = {
+      curMap += (
+        stateRowPair.key.get(1, userKeySchema)
+          .asInstanceOf[UnsafeRow].copy() ->
+          stateRowPair.value.copy()
+        )
+    }
+
+    def updateDataRow(
+        groupingKey: Any,
+        curMap: mutable.Map[Any, Any]): GenericInternalRow = {
+      val row = new GenericInternalRow(3)
+      val mapData = new ArrayBasedMapData(
+        ArrayData.toArrayData(curMap.keys.toArray),

Review Comment:
   Yeah, the mapping is based on index. The constructor will do something like 
this:
   ```
     def apply(...): ArrayBasedMapData = {
         ...
         while (iterator.hasNext) {
           val entry = iterator.next()
           keys(i) = keyConverter(entry.getKey)
           values(i) = valueConverter(entry.getValue)
           i += 1
         }
       ...
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to