haiyangsun-db commented on code in PR #55657:
URL: https://github.com/apache/spark/pull/55657#discussion_r3259912819


##########
udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/EchoProtocolSuite.scala:
##########
@@ -0,0 +1,933 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.udf.worker.core
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import com.google.protobuf.ByteString
+
+// Requires grpc-stub and grpc-inprocess dependencies, plus grpc-java codegen
+// in udf/worker/proto/pom.xml to generate UdfWorkerGrpc.
+import io.grpc.stub.StreamObserver
+import io.grpc.{ManagedChannel, Server, Status}
+import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
+import org.apache.spark.udf.worker.UdfWorkerGrpc
+
+import org.apache.spark.udf.worker.{
+  Cancel, CancelResponse, DataRequest, DataResponse,
+  ErrorResponse, ExecutionError, UserError, WorkerError, ProtocolError,
+  Finish, FinishResponse, Heartbeat, HeartbeatResponse,
+  Init, InitResponse, PayloadChunk, ShutdownRequest, ShutdownResponse,
+  UDFWorkerDataFormat, UdfControlRequest, UdfControlResponse,
+  UdfPayload, UdfRequest, UdfResponse, WorkerRequest, WorkerResponse
+}
+
+// scalastyle:off funsuite
+import org.scalatest.funsuite.AnyFunSuite
+import org.scalatest.BeforeAndAfterEach
+
+/**
+ * Protocol validation test for the UDF gRPC execution protocol.
+ *
+ * Implements a minimal echo worker (gRPC server) and engine client to verify
+ * the full Execute stream lifecycle: init, data streaming, finish, cancel,
+ * error handling, and the Manage RPC. The worker echoes each DataRequest
+ * batch back as a DataResponse; error paths are triggered by a sentinel
+ * payload value.
+ */
+class EchoProtocolSuite extends AnyFunSuite with BeforeAndAfterEach {
+// scalastyle:on funsuite
+
+  private val SUPPORTED_VERSION: Int = 1
+  // A DataRequest whose payload equals this value triggers an ErrorResponse.
+  private val ERROR_TRIGGER: ByteString = ByteString.copyFromUtf8("ERROR")
+  // An init payload whose value equals this triggers an init failure
+  // (InitResponse with error set).
+  private val INIT_ERROR_TRIGGER: ByteString = 
ByteString.copyFromUtf8("INIT_ERROR")
+
+  private var server: Server = _
+  private var channel: ManagedChannel = _
+  private var stub: UdfWorkerGrpc.UdfWorkerStub = _
+
+  override def beforeEach(): Unit = {
+    val serverName = InProcessServerBuilder.generateName()
+    server = InProcessServerBuilder.forName(serverName)
+      .directExecutor()
+      .addService(new EchoWorkerService)
+      .build()
+      .start()
+    channel = 
InProcessChannelBuilder.forName(serverName).directExecutor().build()
+    stub = UdfWorkerGrpc.newStub(channel)
+  }
+
+  override def afterEach(): Unit = {
+    channel.shutdownNow()
+    server.shutdownNow()
+  }
+
+  // 
===========================================================================
+  // WORKER SIDE (gRPC server)
+  // 
===========================================================================
+
+  /**
+   * Worker state machine for one Execute stream.

Review Comment:
   @hvanhovell this sample udf worker impl. aims to define the state 
transitions based on the protocol. Do you think it's worth a separate PR and 
focus on the proto in this PR? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to