This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e92e8f5441a7 [SPARK-47744] Add support for negative-valued bytes in range encoder e92e8f5441a7 is described below commit e92e8f5441a702021e3cbcb282c172f6697f7118 Author: Neil Ramaswamy <neil.ramasw...@databricks.com> AuthorDate: Mon Apr 8 05:48:51 2024 +0900 [SPARK-47744] Add support for negative-valued bytes in range encoder ### What changes were proposed in this pull request? The RocksDBStateEncoder now encodes negative-valued bytes correctly. ### Why are the changes needed? Components that use the state encoder might want to use the full-range of values of the Scala (signed) `Byte` type. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UT was modified. All existing UTs should pass. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45906 from neilramaswamy/spark-47744. Authored-by: Neil Ramaswamy <neil.ramasw...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/execution/streaming/state/RocksDBStateEncoder.scala | 10 ++++++++-- .../sql/execution/streaming/state/RocksDBStateStoreSuite.scala | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala index 06c3940af127..e9b910a76148 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala @@ -323,8 +323,14 @@ class RangeKeyScanStateEncoder( field.dataType match { case BooleanType => case ByteType => - bbuf.put(positiveValMarker) - bbuf.put(value.asInstanceOf[Byte]) + val byteVal = value.asInstanceOf[Byte] + val signCol = if (byteVal < 0) { + negativeValMarker + } else { + positiveValMarker + } + bbuf.put(signCol) + bbuf.put(byteVal) writer.write(idx, bbuf.array()) case ShortType => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index 1e5f664c980c..16a5935e04f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -625,7 +625,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid val timerTimestamps: Seq[(Byte, Int)] = Seq((0x33, 10), (0x1A, 40), (0x1F, 1), (0x01, 68), (0x7F, 2000), (0x01, 27), (0x01, 394), (0x01, 5), (0x03, 980), (0x35, 2112), (0x11, -190), (0x1A, -69), (0x01, -344245), (0x31, -901), - (0x06, 90118), (0x09, 95118), (0x06, 87210)) + (-0x01, 90118), (-0x7F, 95118), (-0x80, 87210)) timerTimestamps.foreach { ts => // order by byte col first and then by int col val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, ts._2, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org