This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new fa5c83f36f Add handling for tcp register timeout leaving connection
dead
fa5c83f36f is described below
commit fa5c83f36fcd15fad44b0842ed7eed11629a6250
Author: Nicolas Vollmar <[email protected]>
AuthorDate: Thu Mar 7 10:48:29 2024 +0100
Add handling for tcp register timeout leaving connection dead
---
.../pekko/io/dns/internal/TcpDnsClientSpec.scala | 23 ++++++++++++++++++++--
.../pekko/io/dns/internal/TcpDnsClient.scala | 7 +++++--
2 files changed, 26 insertions(+), 4 deletions(-)
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/TcpDnsClientSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/TcpDnsClientSpec.scala
index 9164c14121..e756d2b24a 100644
---
a/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/TcpDnsClientSpec.scala
+++
b/actor-tests/src/test/scala/org/apache/pekko/io/dns/internal/TcpDnsClientSpec.scala
@@ -18,12 +18,12 @@ import java.net.InetSocketAddress
import scala.collection.immutable.Seq
import org.apache.pekko
-import pekko.actor.Props
+import pekko.actor.{ ActorKilledException, Kill, Props }
import pekko.io.Tcp
import pekko.io.Tcp.{ Connected, PeerClosed, Register }
import pekko.io.dns.{ RecordClass, RecordType }
import pekko.io.dns.internal.DnsClient.Answer
-import pekko.testkit.{ ImplicitSender, PekkoSpec, TestProbe }
+import pekko.testkit.{ EventFilter, ImplicitSender, PekkoSpec, TestProbe }
class TcpDnsClientSpec extends PekkoSpec with ImplicitSender {
import TcpDnsClient._
@@ -107,5 +107,24 @@ class TcpDnsClientSpec extends PekkoSpec with
ImplicitSender {
answerProbe.expectMsg(Answer(42, Nil))
answerProbe.expectMsg(Answer(43, Nil))
}
+
+ "fail when the connection just terminates" in {
+ val tcpExtensionProbe = TestProbe()
+ val answerProbe = TestProbe()
+ val connectionProbe = TestProbe()
+
+ val client = system.actorOf(Props(new
TcpDnsClient(tcpExtensionProbe.ref, dnsServerAddress, answerProbe.ref)))
+
+ client ! exampleRequestMessage
+
+ tcpExtensionProbe.expectMsg(Tcp.Connect(dnsServerAddress))
+ connectionProbe.send(tcpExtensionProbe.lastSender,
Connected(dnsServerAddress, localAddress))
+ connectionProbe.expectMsgType[Register]
+
+ EventFilter[ActorKilledException](occurrences = 1).intercept {
+ // simulate connection stopping due to register timeout => client must
fail
+ connectionProbe.ref ! Kill
+ }
+ }
}
}
diff --git
a/actor/src/main/scala/org/apache/pekko/io/dns/internal/TcpDnsClient.scala
b/actor/src/main/scala/org/apache/pekko/io/dns/internal/TcpDnsClient.scala
index b8022cca10..07fa1ad948 100644
--- a/actor/src/main/scala/org/apache/pekko/io/dns/internal/TcpDnsClient.scala
+++ b/actor/src/main/scala/org/apache/pekko/io/dns/internal/TcpDnsClient.scala
@@ -14,10 +14,9 @@
package org.apache.pekko.io.dns.internal
import java.net.InetSocketAddress
-
import org.apache.pekko
import pekko.PekkoException
-import pekko.actor.{ Actor, ActorLogging, ActorRef, Stash }
+import pekko.actor.{ Actor, ActorLogging, ActorRef, Stash, Terminated }
import pekko.annotation.InternalApi
import pekko.io.Tcp
import pekko.io.dns.internal.DnsClient.Answer
@@ -49,6 +48,7 @@ import pekko.util.ByteString
log.debug("Connected to TCP address [{}]", ns)
val connection = sender()
context.become(ready(connection))
+ context.watch(connection)
connection ! Tcp.Register(self)
unstashAll()
case _: Message =>
@@ -80,7 +80,10 @@ import pekko.util.ByteString
}
}
case Tcp.PeerClosed =>
+ context.unwatch(connection)
context.become(idle)
+ case Terminated(`connection`) =>
+ throwFailure("TCP connection terminated without closing (register
timeout?)", None)
}
private def parseResponse(data: ByteString) = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]