This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-http.git


The following commit(s) were added to refs/heads/main by this push:
     new 0879c6e33 Create EventStreamParserCrlfSpec.scala‎ (#1038)
0879c6e33 is described below

commit 0879c6e332b474b70d694b6be4830b444d0a92ae
Author: PJ Fanning <[email protected]>
AuthorDate: Tue May 12 19:09:24 2026 +0100

    Create EventStreamParserCrlfSpec.scala‎ (#1038)
    
    * Create EventStreamParserCrlfSpec.scala‎
    
    * Update EventStreamParserCrlfSpec.scala‎
---
 .../EventStreamParserCrlfSpec.scala\342\200\216"   | 239 +++++++++++++++++++++
 1 file changed, 239 insertions(+)

diff --git 
"a/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/EventStreamParserCrlfSpec.scala\342\200\216"
 
"b/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/EventStreamParserCrlfSpec.scala\342\200\216"
new file mode 100644
index 000000000..f9e91ba59
--- /dev/null
+++ 
"b/http-tests/src/test/scala/org/apache/pekko/http/scaladsl/unmarshalling/sse/EventStreamParserCrlfSpec.scala\342\200\216"
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.http
+package scaladsl
+package unmarshalling
+package sse
+
+import org.apache.pekko
+import pekko.http.scaladsl.model.sse.ServerSentEvent
+import pekko.stream.scaladsl.{ Sink, Source }
+import pekko.util.ByteString
+
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AsyncWordSpec
+
+/**
+ * Tests for CRLF line-ending handling via the public [[EventStreamParser]] 
API.
+ * Covers issue https://github.com/apache/pekko-http/issues/797.
+ */
+final class EventStreamParserCrlfSpec extends AsyncWordSpec with Matchers with 
BaseUnmarshallingSpec {
+
+  private val maxLineSize = 1048576
+  private val maxEventSize = 1048576
+
+  "EventStreamParser" when {
+
+    "receiving a stream with CRLF line endings" should {
+
+      "parse a single event with CRLF-terminated data line" in {
+        val input = ByteString("data: hello\r\n\r\n")
+        Source.single(input)
+          .via(EventStreamParser(maxLineSize, maxEventSize))
+          .runWith(Sink.seq)
+          .map(_ shouldBe Vector(ServerSentEvent("hello")))
+      }
+
+      "parse multiple events all using CRLF line endings" in {
+        val input = ByteString("data: event1\r\n\r\ndata: event2\r\n\r\ndata: 
event3\r\n\r\n")
+        Source.single(input)
+          .via(EventStreamParser(maxLineSize, maxEventSize))
+          .runWith(Sink.seq)
+          .map(_ shouldBe Vector(
+            ServerSentEvent("event1"),
+            ServerSentEvent("event2"),
+            ServerSentEvent("event3")))
+      }
+
+      "parse all SSE field types with CRLF line endings" in {
+        val input = ByteString(
+          "data: the data\r\n" +
+          "event: my-event\r\n" +
+          "id: 99\r\n" +
+          "retry: 3000\r\n" +
+          "\r\n")
+        Source.single(input)
+          .via(EventStreamParser(maxLineSize, maxEventSize))
+          .runWith(Sink.seq)
+          .map(_ shouldBe Vector(
+            ServerSentEvent("the data", Some("my-event"), Some("99"), 
Some(3000))))
+      }
+
+      "parse multi-line data fields with CRLF line endings" in {
+        val input = ByteString(
+          "data: line1\r\n" +
+          "data: line2\r\n" +
+          "data: line3\r\n" +
+          "\r\n")
+        Source.single(input)
+          .via(EventStreamParser(maxLineSize, maxEventSize))
+          .runWith(Sink.seq)
+          .map(_ shouldBe Vector(ServerSentEvent("line1\nline2\nline3")))
+      }
+
+      "ignore comment lines with CRLF endings" in {
+        val input = ByteString(
+          "data: event1\r\n" +
+          ":this is a comment\r\n" +
+          "\r\n")
+        Source.single(input)
+          .via(EventStreamParser(maxLineSize, maxEventSize))
+          .runWith(Sink.seq)
+          .map(_ shouldBe Vector(ServerSentEvent("event1")))
+      }
+
+      "not emit events with no data field when emitEmptyEvents is false" in {
+        val input = ByteString(
+          "data: real\r\n" +
+          "\r\n" +
+          "\r\n" +
+          "data: also real\r\n" +
+          "\r\n")
+        Source.single(input)
+          .via(EventStreamParser(maxLineSize, maxEventSize, emitEmptyEvents = 
false))
+          .runWith(Sink.seq)
+          .map(_ shouldBe Vector(
+            ServerSentEvent("real"),
+            ServerSentEvent("also real")))
+      }
+
+      "emit empty events (heartbeats) when emitEmptyEvents is true with CRLF" 
in {
+        // A heartbeat is a data field with an empty value (i.e. "data: " or 
"data:"),
+        // not merely a blank separator line.
+        val input = ByteString(
+          "data: before\r\n" +
+          "\r\n" +
+          "data: \r\n" +
+          "\r\n" +
+          "data: after\r\n" +
+          "\r\n")
+        Source.single(input)
+          .via(EventStreamParser(maxLineSize, maxEventSize, emitEmptyEvents = 
true))
+          .runWith(Sink.seq)
+          .map(_ shouldBe Vector(
+            ServerSentEvent("before"),
+            ServerSentEvent.heartbeat,
+            ServerSentEvent("after")))
+      }
+    }
+
+    "receiving a stream with CR-only (\\r) line endings" should {
+
+      "parse a single event with CR-only line endings" in {
+        val input = ByteString("data: hello\r\r")
+        Source.single(input)
+          .via(EventStreamParser(maxLineSize, maxEventSize))
+          .runWith(Sink.seq)
+          .map(_ shouldBe Vector(ServerSentEvent("hello")))
+      }
+
+      "parse multiple events with CR-only line endings" in {
+        val input = ByteString("data: event1\r\rdata: event2\r\r")
+        Source.single(input)
+          .via(EventStreamParser(maxLineSize, maxEventSize))
+          .runWith(Sink.seq)
+          .map(_ shouldBe Vector(
+            ServerSentEvent("event1"),
+            ServerSentEvent("event2")))
+      }
+
+      "parse all SSE field types with CR-only line endings" in {
+        val input = ByteString("data: the data\revent: my-event\rid: 
42\rretry: 1000\r\r")
+        Source.single(input)
+          .via(EventStreamParser(maxLineSize, maxEventSize))
+          .runWith(Sink.seq)
+          .map(_ shouldBe Vector(
+            ServerSentEvent("the data", Some("my-event"), Some("42"), 
Some(1000))))
+      }
+    }
+
+    "receiving a stream with mixed line endings" should {
+
+      "parse events correctly when line endings vary within the stream" in {
+        // Mix of LF-only, CR-only, and CRLF
+        val input = ByteString(
+          "data: lf-event\n" +
+          "\n" +
+          "data: cr-event\r" +
+          "\r" +
+          "data: crlf-event\r\n" +
+          "\r\n")
+        Source.single(input)
+          .via(EventStreamParser(maxLineSize, maxEventSize))
+          .runWith(Sink.seq)
+          .map(_ shouldBe Vector(
+            ServerSentEvent("lf-event"),
+            ServerSentEvent("cr-event"),
+            ServerSentEvent("crlf-event")))
+      }
+
+      "parse a single event whose fields use different line endings" in {
+        // Each field line uses a different terminator; the event is 
terminated by a lone LF
+        val input = ByteString(
+          "data: the data\r\n" +
+          "event: my-event\n" +
+          "id: 7\r\n" +
+          "\n")
+        Source.single(input)
+          .via(EventStreamParser(maxLineSize, maxEventSize))
+          .runWith(Sink.seq)
+          .map(_ shouldBe Vector(
+            ServerSentEvent("the data", Some("my-event"), Some("7"))))
+      }
+    }
+
+    "receiving a CRLF stream delivered in multiple small chunks" should {
+
+      "parse events correctly when CRLF is split across chunk boundaries" in {
+        // The \r and \n of the CRLF pair for event1 arrive in separate 
ByteString chunks
+        val chunks = Vector(
+          ByteString("data: event1\r"),     // ends with \r
+          ByteString("\n\r\n"),             // \n completes the CRLF; \r\n is 
the event separator
+          ByteString("data: event2\r\n\r\n"))
+        Source(chunks)
+          .via(EventStreamParser(maxLineSize, maxEventSize))
+          .runWith(Sink.seq)
+          .map(_ shouldBe Vector(
+            ServerSentEvent("event1"),
+            ServerSentEvent("event2")))
+      }
+
+      "parse events correctly when data arrives byte by byte with CRLF" in {
+        val bytes = ByteString("data: hello\r\n\r\n")
+        Source(bytes.map(ByteString(_)))
+          .via(EventStreamParser(maxLineSize, maxEventSize))
+          .runWith(Sink.seq)
+          .map(_ shouldBe Vector(ServerSentEvent("hello")))
+      }
+
+      "reassemble a multi-event CRLF stream delivered in arbitrary chunks" in {
+        val fullStream =
+          "data: first\r\nevent: alpha\r\nid: 1\r\n\r\n" +
+          "data: second\r\nid: 2\r\n\r\n"
+        // split into 5-byte chunks
+        val chunks = 
ByteString(fullStream).grouped(5).map(ByteString(_)).toVector
+        Source(chunks)
+          .via(EventStreamParser(maxLineSize, maxEventSize))
+          .runWith(Sink.seq)
+          .map(_ shouldBe Vector(
+            ServerSentEvent("first", Some("alpha"), Some("1")),
+            ServerSentEvent("second", None, Some("2"))))
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to