This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e92e8f5441a7 [SPARK-47744] Add support for negative-valued bytes in 
range encoder
e92e8f5441a7 is described below

commit e92e8f5441a702021e3cbcb282c172f6697f7118
Author: Neil Ramaswamy <neil.ramasw...@databricks.com>
AuthorDate: Mon Apr 8 05:48:51 2024 +0900

    [SPARK-47744] Add support for negative-valued bytes in range encoder
    
    ### What changes were proposed in this pull request?
    
    The RocksDBStateEncoder now encodes negative-valued bytes correctly.
    
    ### Why are the changes needed?
    
    Components that use the state encoder might want to use the full-range of 
values of the Scala (signed) `Byte` type.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UT was modified. All existing UTs should pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45906 from neilramaswamy/spark-47744.
    
    Authored-by: Neil Ramaswamy <neil.ramasw...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/execution/streaming/state/RocksDBStateEncoder.scala    | 10 ++++++++--
 .../sql/execution/streaming/state/RocksDBStateStoreSuite.scala |  2 +-
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
index 06c3940af127..e9b910a76148 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
@@ -323,8 +323,14 @@ class RangeKeyScanStateEncoder(
         field.dataType match {
           case BooleanType =>
           case ByteType =>
-            bbuf.put(positiveValMarker)
-            bbuf.put(value.asInstanceOf[Byte])
+            val byteVal = value.asInstanceOf[Byte]
+            val signCol = if (byteVal < 0) {
+              negativeValMarker
+            } else {
+              positiveValMarker
+            }
+            bbuf.put(signCol)
+            bbuf.put(byteVal)
             writer.write(idx, bbuf.array())
 
           case ShortType =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index 1e5f664c980c..16a5935e04f4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -625,7 +625,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
       val timerTimestamps: Seq[(Byte, Int)] = Seq((0x33, 10), (0x1A, 40), 
(0x1F, 1), (0x01, 68),
         (0x7F, 2000), (0x01, 27), (0x01, 394), (0x01, 5), (0x03, 980), (0x35, 
2112),
         (0x11, -190), (0x1A, -69), (0x01, -344245), (0x31, -901),
-        (0x06, 90118), (0x09, 95118), (0x06, 87210))
+        (-0x01, 90118), (-0x7F, 95118), (-0x80, 87210))
       timerTimestamps.foreach { ts =>
         // order by byte col first and then by int col
         val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, 
ts._2,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to