Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r229218791 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest + +import java.net._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ +import org.apache.spark.internal.Logging +import org.apache.spark.util + +private[spark] trait StreamingCompatibilitySuite { + + k8sSuite: KubernetesSuite => + + import StreamingCompatibilitySuite._ + + test("Run spark streaming in client mode.", k8sTestTag) { + val (host, port, serverSocket) = startSocketServer() + val driverService = driverServiceSetup + try { + setupSparkStreamingPod(driverService.getMetadata.getName) + .addToArgs("streaming.NetworkWordCount") + .addToArgs(host, port.toString) + .endContainer() + .endSpec() + .done() + Eventually.eventually(TIMEOUT, INTERVAL) { + assert(getRunLog.contains("spark-streaming-kube"), "The application did not complete.") + } + } finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents + .kubernetesClient + .services() + .inNamespace(kubernetesTestComponents.namespace) + .delete(driverService) + serverSocket.close() + } + } + + test("Run spark streaming in cluster mode.", k8sTestTag) { + val (host, port, serverSocket) = startSocketServer() + try { + runSparkJVMCheckAndVerifyCompletion( + mainClass = "org.apache.spark.examples.streaming.NetworkWordCount", + appArgs = Array[String](host, port.toString), + expectedJVMValue = Seq("spark-streaming-kube")) + } finally { + serverSocket.close() + } + } + + test("Run spark structured streaming in cluster mode.", k8sTestTag) { + val (host, port, serverSocket) = startSocketServer() + try { + runSparkJVMCheckAndVerifyCompletion( + mainClass = "org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount", + appArgs = Array[String](host, port.toString), + expectedJVMValue = Seq("spark-streaming-kube")) + } finally { + serverSocket.close() + } + } + + test("Run spark structured streaming in client mode.", k8sTestTag) { + val (host, port, serverSocket) = startSocketServer() + val driverService = driverServiceSetup + try { + setupSparkStreamingPod(driverService.getMetadata.getName) + .addToArgs("sql.streaming.StructuredNetworkWordCount") + .addToArgs(host, port.toString) + .endContainer() + .endSpec() + .done() + + val TIMEOUT = PatienceConfiguration.Timeout(Span(3, Minutes)) + Eventually.eventually(TIMEOUT, INTERVAL) { + assert(getRunLog.contains("spark-streaming-kube"), + "The application did not complete.") + } + } + finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents + .kubernetesClient + .services() + .inNamespace(kubernetesTestComponents.namespace) + .delete(driverService) + serverSocket.close() + } + } + + private def getRunLog: String = kubernetesTestComponents.kubernetesClient + .pods() + .withName(driverPodName) + .getLog + + private def setupSparkStreamingPod(driverServiceName: String) = { + val labels = Map("spark-app-selector" -> driverPodName) + testBackend + .getKubernetesClient + .pods() + .inNamespace(kubernetesTestComponents.namespace) + .createNew() + .withNewMetadata() + .withName(driverPodName) + .withLabels(labels.asJava) + .endMetadata() + .withNewSpec() + .withServiceAccountName(kubernetesTestComponents.serviceAccountName) --- End diff -- I have run these tests on my own setup of minikube, and I am unable to reproduce the failure that occurred on jenkins. It is possible that is related to how minikube is setup on jenkins.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org