This is an automated email from the ASF dual-hosted git repository.
lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-toree.git
The following commit(s) were added to refs/heads/master by this push:
new 47970f52 [TOREE-540] Fix deadlock on closing ZMQ by upgrading jeromq
to 0.5.3 (#205)
47970f52 is described below
commit 47970f52081ca5bdd82e0fb987a7f99e5cfdcbfc
Author: Cheng Pan <[email protected]>
AuthorDate: Sun Aug 6 14:05:29 2023 +0800
[TOREE-540] Fix deadlock on closing ZMQ by upgrading jeromq to 0.5.3 (#205)
---
.../apache/toree/communication/SocketManager.scala | 10 ++---
.../communication/socket/PubSocketRunnable.scala | 5 ++-
.../communication/socket/ReqSocketRunnable.scala | 5 ++-
.../toree/communication/socket/SocketType.scala | 44 ----------------------
.../socket/ZeroMQSocketRunnable.scala | 10 ++---
.../socket/ZeroMQSocketRunnableSpec.scala | 23 +++++------
project/Dependencies.scala | 2 +-
7 files changed, 29 insertions(+), 70 deletions(-)
diff --git
a/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala
b/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala
index 0422334a..7041a58e 100644
---
a/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala
+++
b/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala
@@ -19,7 +19,7 @@ package org.apache.toree.communication
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import org.apache.toree.communication.socket._
-import org.zeromq.ZMQ
+import org.zeromq.{SocketType, ZMQ}
import scala.collection.JavaConverters._
@@ -99,7 +99,7 @@ class SocketManager {
): SocketLike = withNewContext{ ctx =>
new JeroMQSocket(new ZeroMQSocketRunnable(
ctx,
- RepSocket,
+ SocketType.REP,
Some(inboundMessageCallback),
Bind(address),
Linger(0)
@@ -137,7 +137,7 @@ class SocketManager {
): SocketLike = withNewContext { ctx =>
new JeroMQSocket(new ZeroMQSocketRunnable(
ctx,
- SubSocket,
+ SocketType.SUB,
Some(inboundMessageCallback),
Connect(address),
Linger(0),
@@ -159,7 +159,7 @@ class SocketManager {
): SocketLike = withNewContext { ctx =>
new JeroMQSocket(new ZeroMQSocketRunnable(
ctx,
- RouterSocket,
+ SocketType.ROUTER,
Some(inboundMessageCallback),
Bind(address),
Linger(0)
@@ -181,7 +181,7 @@ class SocketManager {
): SocketLike = withNewContext{ ctx =>
new JeroMQSocket(new ZeroMQSocketRunnable(
ctx,
- DealerSocket,
+ SocketType.DEALER,
Some(inboundMessageCallback),
Connect(address),
Linger(0),
diff --git
a/communication/src/main/scala/org/apache/toree/communication/socket/PubSocketRunnable.scala
b/communication/src/main/scala/org/apache/toree/communication/socket/PubSocketRunnable.scala
index 40d680d7..c3a587ec 100644
---
a/communication/src/main/scala/org/apache/toree/communication/socket/PubSocketRunnable.scala
+++
b/communication/src/main/scala/org/apache/toree/communication/socket/PubSocketRunnable.scala
@@ -16,7 +16,8 @@
*/
package org.apache.toree.communication.socket
-import org.zeromq.ZMQ.{Socket, Context}
+import org.zeromq.SocketType
+import org.zeromq.ZMQ.{Context, Socket}
/**
* Represents the runnable component of a socket specifically targeted towards
@@ -30,7 +31,7 @@ class PubSocketRunnable(
private val socketOptions: SocketOption*
) extends ZeroMQSocketRunnable(
context,
- PubSocket,
+ SocketType.PUB,
None,
socketOptions: _*
) {
diff --git
a/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala
b/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala
index cd3d85e7..dea651cf 100644
---
a/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala
+++
b/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala
@@ -16,7 +16,8 @@
*/
package org.apache.toree.communication.socket
-import org.zeromq.ZMQ.{Socket, Context}
+import org.zeromq.SocketType
+import org.zeromq.ZMQ.{Context, Socket}
/**
* Represents the runnable component of a socket that processes messages and
@@ -35,7 +36,7 @@ class ReqSocketRunnable(
private val socketOptions: SocketOption*
) extends ZeroMQSocketRunnable(
context,
- ReqSocket,
+ SocketType.REQ,
inboundMessageCallback,
socketOptions: _*
) {
diff --git
a/communication/src/main/scala/org/apache/toree/communication/socket/SocketType.scala
b/communication/src/main/scala/org/apache/toree/communication/socket/SocketType.scala
deleted file mode 100644
index a93a6bc2..00000000
---
a/communication/src/main/scala/org/apache/toree/communication/socket/SocketType.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-package org.apache.toree.communication.socket
-
-import org.zeromq.ZMQ
-
-/**
- * Represents the type option used to indicate the type of socket to create.
- *
- * @param `type` The type as an integer
- */
-sealed class SocketType(val `type`: Int)
-
-/** Represents a publish socket. */
-case object PubSocket extends SocketType(ZMQ.PUB)
-
-/** Represents a subscribe socket. */
-case object SubSocket extends SocketType(ZMQ.SUB)
-
-/** Represents a reply socket. */
-case object RepSocket extends SocketType(ZMQ.REP)
-
-/** Represents a request socket. */
-case object ReqSocket extends SocketType(ZMQ.REQ)
-
-/** Represents a router socket. */
-case object RouterSocket extends SocketType(ZMQ.ROUTER)
-
-/** Represents a dealer socket. */
-case object DealerSocket extends SocketType(ZMQ.DEALER)
diff --git
a/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala
b/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala
index 0464d574..1ac7f08e 100644
---
a/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala
+++
b/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala
@@ -17,7 +17,7 @@
package org.apache.toree.communication.socket
import org.apache.toree.utils.LogLike
-import org.zeromq.{ZMsg, ZMQ}
+import org.zeromq.{SocketType, ZMsg, ZMQ}
import org.zeromq.ZMQ.Context
import scala.collection.JavaConverters._
@@ -64,7 +64,7 @@ class ZeroMQSocketRunnable(
*/
protected def processOptions(socket: ZMQ.Socket): Unit = {
val socketOptionsString = socketOptions.map("\n- " +
_.toString).mkString("")
- logger.trace(
+ logger.info(
s"Processing options for socket $socketType: $socketOptionsString"
)
@@ -103,7 +103,7 @@ class ZeroMQSocketRunnable(
*/
protected def processNextOutboundMessage(socket: ZMQ.Socket): Boolean = {
val message = Option(outboundMessages.poll())
-
+ message.foreach(msg => println(s"send: \n$msg"))
message.foreach(_.send(socket))
message.nonEmpty
@@ -134,11 +134,11 @@ class ZeroMQSocketRunnable(
*
* @return The new ZMQ.Socket instance
*/
- protected def newZmqSocket(zmqContext: ZMQ.Context, socketType: Int) =
+ protected def newZmqSocket(zmqContext: ZMQ.Context, socketType: SocketType) =
zmqContext.socket(socketType)
override def run(): Unit = {
- val socket = newZmqSocket(context,
socketType.`type`)//context.socket(socketType.`type`)
+ val socket = newZmqSocket(context,
socketType)//context.socket(socketType.`type`)
try {
processOptions(socket)
diff --git
a/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala
b/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala
index 605fa296..b77a6efd 100644
---
a/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala
+++
b/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala
@@ -20,8 +20,8 @@ import org.scalatest.concurrent.Eventually
import org.scalatestplus.mockito.MockitoSugar
import org.scalatest.time.{Milliseconds, Seconds, Span}
import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
-import org.zeromq.ZMQ
-import org.zeromq.ZMQ.{Socket, Context}
+import org.zeromq.{SocketType, ZMQ}
+import org.zeromq.ZMQ.{Context, Socket}
import scala.util.Try
@@ -50,13 +50,14 @@ class ZeroMQSocketRunnableSpec extends FunSpec with Matchers
inboundMessageCallback,
socketOptions: _*
) {
- override protected def newZmqSocket(zmqContext: Context, socketType: Int):
Socket = socket
+ override protected def newZmqSocket(zmqContext: Context, socketType:
SocketType): Socket = socket
}
before {
- mockSocketType = mock[SocketType]
+ // TODO mockito 1.x does not support mock/spy enum, upgrade 2.x to achieve
it
+ mockSocketType = SocketType.RAW // mock[SocketType]
zmqContext = ZMQ.context(1)
- pubSocket = zmqContext.socket(PubSocket.`type`)
+ pubSocket = zmqContext.socket(SocketType.PUB)
}
after {
@@ -119,7 +120,7 @@ class ZeroMQSocketRunnableSpec extends FunSpec with Matchers
val runnable: TestRunnable = new TestRunnable(
pubSocket,
zmqContext,
- PubSocket,
+ SocketType.PUB,
None,
Connect(TestAddress),
Linger(expected)
@@ -142,7 +143,7 @@ class ZeroMQSocketRunnableSpec extends FunSpec with Matchers
val runnable: TestRunnable = new TestRunnable(
pubSocket,
zmqContext,
- PubSocket,
+ SocketType.PUB,
None,
Connect(TestAddress),
Identity(expected)
@@ -163,7 +164,7 @@ class ZeroMQSocketRunnableSpec extends FunSpec with Matchers
val runnable = new TestRunnable(
pubSocket,
zmqContext,
- PubSocket,
+ SocketType.PUB,
None,
Connect(TestAddress)
)
@@ -189,7 +190,7 @@ class ZeroMQSocketRunnableSpec extends FunSpec with Matchers
val runnable = new TestRunnable(
pubSocket,
zmqContext,
- PubSocket,
+ SocketType.PUB,
None,
Connect(TestAddress)
)
@@ -213,7 +214,7 @@ class ZeroMQSocketRunnableSpec extends FunSpec with Matchers
val runnable = new TestRunnable(
pubSocket,
zmqContext,
- PubSocket,
+ SocketType.PUB,
None,
Connect(TestAddress)
)
@@ -235,7 +236,7 @@ class ZeroMQSocketRunnableSpec extends FunSpec with Matchers
val runnable = new TestRunnable(
pubSocket,
zmqContext,
- PubSocket,
+ SocketType.PUB,
None,
Connect(TestAddress)
)
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index f0a8520c..9c90ee6e 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -41,7 +41,7 @@ object Dependencies {
// use the same jackson version in test than the one provided at runtime by
Spark 3.0.0
val jacksonDatabind = "com.fasterxml.jackson.core" % "jackson-databind" %
"2.10.0" // Apache v2
- val jeroMq = "org.zeromq" % "jeromq" % "0.4.3" // MPL v2
+ val jeroMq = "org.zeromq" % "jeromq" % "0.5.3" // MPL v2
val joptSimple = "net.sf.jopt-simple" % "jopt-simple" % "4.9" // MIT