This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch migrate/xml-parse-with-context-2935 in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
commit bc127ff1289a9c76a84b5f90663a32877cf4cf5c Author: 虎鸣 <[email protected]> AuthorDate: Wed Mar 11 00:46:27 2026 +0800 feat(xml): Add context-aware XML parsing (parserWithContext API) Migrated from upstream akka/alpakka commit b813e7fb (alpakka#2935), which is now Apache licensed. This adds a new parserWithContext[Ctx]() API to the XML module that allows attaching context information to XML parse events. The context flows through the parsing pipeline via FlowWithContext, enabling use cases such as tracking line numbers, source positions, or any arbitrary metadata alongside parsed XML events. Changes: - StreamingXmlParser: Generalized to support context via ContextHandler type class (uncontextual for backward compat, contextual for new API) - scaladsl.XmlParsing: Added parserWithContext[Ctx]() method - javadsl.XmlParsing: Added parserWithContext[Ctx]() overloads (3 variants) - XmlProcessingSpec: Added test for context-aware parsing with line numbers All existing parser APIs remain backward compatible. Upstream: https://github.com/akka/alpakka/commit/b813e7fb772153edf14f7e5997abad72f5acef07 Co-authored-by: Copilot <[email protected]> --- .gitmodules | 3 + .upstream-alpakka | 1 + .../connectors/xml/impl/StreamingXmlParser.scala | 70 +++++++++++++++++----- .../stream/connectors/xml/javadsl/XmlParsing.scala | 31 ++++++++++ .../connectors/xml/scaladsl/XmlParsing.scala | 25 +++++++- .../scala/docs/scaladsl/XmlProcessingSpec.scala | 38 +++++++++++- 6 files changed, 149 insertions(+), 19 deletions(-) diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 000000000..d8999e6c9 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule ".upstream-alpakka"] + path = .upstream-alpakka + url = https://github.com/akka/alpakka.git diff --git a/.upstream-alpakka b/.upstream-alpakka new file mode 160000 index 000000000..b0becb5d3 --- /dev/null +++ b/.upstream-alpakka @@ -0,0 +1 @@ +Subproject commit b0becb5d3ec28aa06476a32c961e764fd488ac3b diff --git a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlParser.scala b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlParser.scala index 05f12cf1f..94e5e84ee 100644 --- a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlParser.scala +++ b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlParser.scala @@ -28,21 +28,53 @@ import scala.annotation.tailrec private[xml] object StreamingXmlParser { lazy val withStreamingFinishedException = new IllegalStateException("Stream finished before event was fully parsed.") + + /** + * Type class that handles extracting and re-attaching context information + * during XML parsing. This enables context-aware parsing where each parse + * event can carry additional context from the input side. + */ + sealed trait ContextHandler[A, B, Ctx] { + def getByteString(a: A): ByteString + def getContext(a: A): Ctx + def buildOutput(pe: ParseEvent, ctx: Ctx): B + } + + object ContextHandler { + + /** Handler for standard (context-free) parsing: ByteString in, ParseEvent out. */ + final val uncontextual: ContextHandler[ByteString, ParseEvent, Unit] = + new ContextHandler[ByteString, ParseEvent, Unit] { + def getByteString(a: ByteString): ByteString = a + def getContext(a: ByteString): Unit = () + def buildOutput(pe: ParseEvent, ctx: Unit): ParseEvent = pe + } + + /** Handler for context-aware parsing: (ByteString, Ctx) in, (ParseEvent, Ctx) out. */ + final def contextual[Ctx]: ContextHandler[(ByteString, Ctx), (ParseEvent, Ctx), Ctx] = + new ContextHandler[(ByteString, Ctx), (ParseEvent, Ctx), Ctx] { + def getByteString(a: (ByteString, Ctx)): ByteString = a._1 + def getContext(a: (ByteString, Ctx)): Ctx = a._2 + def buildOutput(pe: ParseEvent, ctx: Ctx): (ParseEvent, Ctx) = (pe, ctx) + } + } } /** * INTERNAL API */ -@InternalApi private[xml] class StreamingXmlParser(ignoreInvalidChars: Boolean, - configureFactory: AsyncXMLInputFactory => Unit) - extends GraphStage[FlowShape[ByteString, ParseEvent]] { - val in: Inlet[ByteString] = Inlet("XMLParser.in") - val out: Outlet[ParseEvent] = Outlet("XMLParser.out") - override val shape: FlowShape[ByteString, ParseEvent] = FlowShape(in, out) +@InternalApi private[xml] class StreamingXmlParser[A, B, Ctx](ignoreInvalidChars: Boolean, + configureFactory: AsyncXMLInputFactory => Unit, + transform: StreamingXmlParser.ContextHandler[A, B, Ctx]) + extends GraphStage[FlowShape[A, B]] { + val in: Inlet[A] = Inlet("XMLParser.in") + val out: Outlet[B] = Outlet("XMLParser.out") + override val shape: FlowShape[A, B] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { private var started: Boolean = false + private var context: Ctx = _ import javax.xml.stream.XMLStreamConstants @@ -56,7 +88,10 @@ private[xml] object StreamingXmlParser { setHandlers(in, out, this) override def onPush(): Unit = { - val array = grab(in).toArray + val a = grab(in) + val bs = transform.getByteString(a) + context = transform.getContext(a) + val array = bs.toArray parser.getInputFeeder.feedInput(array, 0, array.length) advanceParser() } @@ -78,10 +113,10 @@ private[xml] object StreamingXmlParser { case XMLStreamConstants.START_DOCUMENT => started = true - push(out, StartDocument) + push(out, transform.buildOutput(StartDocument, context)) case XMLStreamConstants.END_DOCUMENT => - push(out, EndDocument) + push(out, transform.buildOutput(EndDocument, context)) completeStage() case XMLStreamConstants.START_ELEMENT => @@ -102,26 +137,29 @@ private[xml] object StreamingXmlParser { val optNs = optPrefix.flatMap(prefix => Option(parser.getNamespaceURI(prefix))) push( out, - StartElement(parser.getLocalName, + transform.buildOutput(StartElement(parser.getLocalName, attributes, optPrefix.filterNot(_ == ""), optNs.filterNot(_ == ""), - namespaceCtx = namespaces)) + namespaceCtx = namespaces), + context)) case XMLStreamConstants.END_ELEMENT => - push(out, EndElement(parser.getLocalName)) + push(out, transform.buildOutput(EndElement(parser.getLocalName), context)) case XMLStreamConstants.CHARACTERS => - push(out, Characters(parser.getText)) + push(out, transform.buildOutput(Characters(parser.getText), context)) case XMLStreamConstants.PROCESSING_INSTRUCTION => - push(out, ProcessingInstruction(Option(parser.getPITarget), Option(parser.getPIData))) + push(out, + transform.buildOutput(ProcessingInstruction(Option(parser.getPITarget), Option(parser.getPIData)), + context)) case XMLStreamConstants.COMMENT => - push(out, Comment(parser.getText)) + push(out, transform.buildOutput(Comment(parser.getText), context)) case XMLStreamConstants.CDATA => - push(out, CData(parser.getText)) + push(out, transform.buildOutput(CData(parser.getText), context)) // Do not support DTD, SPACE, NAMESPACE, NOTATION_DECLARATION, ENTITY_DECLARATION, PROCESSING_INSTRUCTION // ATTRIBUTE is handled in START_ELEMENT implicitly diff --git a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/javadsl/XmlParsing.scala b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/javadsl/XmlParsing.scala index 3a2fca1e5..d10edd8d5 100644 --- a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/javadsl/XmlParsing.scala +++ b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/javadsl/XmlParsing.scala @@ -33,12 +33,31 @@ object XmlParsing { def parser(): pekko.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] = xml.scaladsl.XmlParsing.parser.asJava + /** + * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX while keeping + * a context attached. + * + * Upstream from akka/alpakka#2935 (which is now Apache licensed). + */ + def parserWithContext[Ctx](): pekko.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = + xml.scaladsl.XmlParsing.parserWithContext().asJava + /** * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX. */ def parser(ignoreInvalidChars: Boolean): pekko.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] = xml.scaladsl.XmlParsing.parser(ignoreInvalidChars).asJava + /** + * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX while keeping + * a context attached. + * + * Upstream from akka/alpakka#2935 (which is now Apache licensed). + */ + def parserWithContext[Ctx]( + ignoreInvalidChars: Boolean): pekko.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = + xml.scaladsl.XmlParsing.parserWithContext(ignoreInvalidChars).asJava + /** * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX. */ @@ -54,6 +73,18 @@ object XmlParsing { configureFactory: Consumer[AsyncXMLInputFactory]): pekko.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] = xml.scaladsl.XmlParsing.parser(ignoreInvalidChars, configureFactory.accept(_)).asJava + /** + * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX while keeping + * a context attached. + * + * Upstream from akka/alpakka#2935 (which is now Apache licensed). + */ + def parserWithContext[Ctx]( + ignoreInvalidChars: Boolean, + configureFactory: Consumer[AsyncXMLInputFactory]) + : pekko.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = + xml.scaladsl.XmlParsing.parserWithContext(ignoreInvalidChars, configureFactory.accept(_)).asJava + /** * A Flow that transforms a stream of XML ParseEvents. This stage coalesces consequitive CData and Characters * events into a single Characters event or fails if the buffered string is larger than the maximum defined. diff --git a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/scaladsl/XmlParsing.scala b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/scaladsl/XmlParsing.scala index 2cf101363..40d67401c 100644 --- a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/scaladsl/XmlParsing.scala +++ b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/scaladsl/XmlParsing.scala @@ -17,7 +17,7 @@ import org.apache.pekko import pekko.NotUsed import pekko.stream.connectors.xml.ParseEvent import pekko.stream.connectors.xml.impl -import pekko.stream.scaladsl.Flow +import pekko.stream.scaladsl.{ Flow, FlowWithContext } import pekko.util.ByteString import com.fasterxml.aalto.AsyncXMLInputFactory import org.w3c.dom.Element @@ -50,7 +50,28 @@ object XmlParsing { */ def parser(ignoreInvalidChars: Boolean = false, configureFactory: AsyncXMLInputFactory => Unit = configureDefault): Flow[ByteString, ParseEvent, NotUsed] = - Flow.fromGraph(new impl.StreamingXmlParser(ignoreInvalidChars, configureFactory)) + Flow[ByteString].via( + Flow.fromGraph( + new impl.StreamingXmlParser[ByteString, ParseEvent, Unit](ignoreInvalidChars, + configureFactory, + impl.StreamingXmlParser.ContextHandler.uncontextual))) + + /** + * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX while keeping + * a context attached. + * + * Upstream from akka/alpakka#2935 (which is now Apache licensed). + */ + def parserWithContext[Ctx]( + ignoreInvalidChars: Boolean = false, + configureFactory: AsyncXMLInputFactory => Unit = + configureDefault): FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = + FlowWithContext.fromTuples( + Flow.fromGraph( + new impl.StreamingXmlParser[(ByteString, Ctx), (ParseEvent, Ctx), Ctx]( + ignoreInvalidChars, + configureFactory, + impl.StreamingXmlParser.ContextHandler.contextual))) /** * A Flow that transforms a stream of XML ParseEvents. This stage coalesces consecutive CData and Characters diff --git a/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala b/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala index 0187ba7cf..c6f0d3c13 100644 --- a/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala +++ b/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala @@ -18,7 +18,7 @@ import pekko.actor.ActorSystem import pekko.stream.connectors.testkit.scaladsl.LogCapturing import pekko.stream.connectors.xml._ import pekko.stream.connectors.xml.scaladsl.XmlParsing -import pekko.stream.scaladsl.{ Flow, Keep, Sink, Source } +import pekko.stream.scaladsl.{ Flow, Framing, Keep, Sink, Source } import pekko.util.ByteString import org.scalatest.concurrent.ScalaFutures import org.scalatest.BeforeAndAfterAll @@ -340,6 +340,42 @@ class XmlProcessingSpec extends AnyWordSpec with Matchers with ScalaFutures with configWasCalled shouldBe true } + "parse XML and attach line numbers as context" in { + val doc = """|<doc> + | <elem> + | elem1 + | </elem> + | <elem> + | elem2 + | </elem> + |</doc>""".stripMargin + val resultFuture = Source + .single(ByteString(doc)) + .via( + Framing.delimiter(delimiter = ByteString(System.lineSeparator), + maximumFrameLength = 65536, + allowTruncation = true)) + .zipWithIndex + .runWith(XmlParsing.parserWithContext[Long]().asFlow.toMat(Sink.seq)(Keep.right)) + + resultFuture.futureValue should ===( + List( + (StartDocument, 0L), + (StartElement("doc"), 0L), + (Characters(" "), 1L), + (StartElement("elem"), 1L), + (Characters(" elem1"), 2L), + (Characters(" "), 3L), + (EndElement("elem"), 3L), + (Characters(" "), 4L), + (StartElement("elem"), 4L), + (Characters(" elem2"), 5L), + (Characters(" "), 6L), + (EndElement("elem"), 6L), + (EndElement("doc"), 7L), + (EndDocument, 7L))) + } + } override protected def afterAll(): Unit = system.terminate() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
