[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r229218791 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest + +import java.net._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ +import org.apache.spark.internal.Logging +import org.apache.spark.util + +private[spark] trait StreamingCompatibilitySuite { + + k8sSuite: KubernetesSuite => + + import StreamingCompatibilitySuite._ + + test("Run spark streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +val driverService = driverServiceSetup +try { + setupSparkStreamingPod(driverService.getMetadata.getName) +.addToArgs("streaming.NetworkWordCount") +.addToArgs(host, port.toString) +.endContainer() +.endSpec() +.done() + Eventually.eventually(TIMEOUT, INTERVAL) { +assert(getRunLog.contains("spark-streaming-kube"), "The application did not complete.") + } +} finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents +.kubernetesClient +.services() +.inNamespace(kubernetesTestComponents.namespace) +.delete(driverService) + serverSocket.close() +} + } + + test("Run spark streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +mainClass = "org.apache.spark.examples.streaming.NetworkWordCount", +appArgs = Array[String](host, port.toString), +expectedJVMValue = Seq("spark-streaming-kube")) +} finally { + serverSocket.close() +} + } + + test("Run spark structured streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +mainClass = "org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount", +appArgs = Array[String](host, port.toString), +expectedJVMValue = Seq("spark-streaming-kube")) +} finally { + serverSocket.close() +} + } + + test("Run spark structured streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +val driverService = driverServiceSetup +try { + setupSparkStreamingPod(driverService.getMetadata.getName) +.addToArgs("sql.streaming.StructuredNetworkWordCount") +.addToArgs(host, port.toString) +.endContainer() +.endSpec() +.done() + + val TIMEOUT = PatienceConfiguration.Timeout(Span(3, Minutes)) + Eventually.eventually(TIMEOUT, INTERVAL) { +assert(getRunLog.contains("spark-streaming-kube"), + "The application did not complete.") + } +} +finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents +.kubernetesClient +.services() +.inNamespace(kubernetesTestComponents.namespace) +.delete(driverService) + serverSocket.close() +} + } + + private def
[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r227060819 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest + +import java.net._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ +import org.apache.spark.internal.Logging +import org.apache.spark.util + +private[spark] trait StreamingCompatibilitySuite { + + k8sSuite: KubernetesSuite => + + import StreamingCompatibilitySuite._ + + test("Run spark streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +val driverService = driverServiceSetup +try { + setupSparkStreamingPod(driverService.getMetadata.getName) +.addToArgs("streaming.NetworkWordCount") +.addToArgs(host, port.toString) +.endContainer() +.endSpec() +.done() + Eventually.eventually(TIMEOUT, INTERVAL) { +assert(getRunLog.contains("spark-streaming-kube"), "The application did not complete.") + } +} finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents +.kubernetesClient +.services() +.inNamespace(kubernetesTestComponents.namespace) +.delete(driverService) + serverSocket.close() +} + } + + test("Run spark streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +mainClass = "org.apache.spark.examples.streaming.NetworkWordCount", +appArgs = Array[String](host, port.toString), +expectedJVMValue = Seq("spark-streaming-kube")) +} finally { + serverSocket.close() +} + } + + test("Run spark structured streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +mainClass = "org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount", +appArgs = Array[String](host, port.toString), +expectedJVMValue = Seq("spark-streaming-kube")) +} finally { + serverSocket.close() +} + } + + test("Run spark structured streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +val driverService = driverServiceSetup +try { + setupSparkStreamingPod(driverService.getMetadata.getName) +.addToArgs("sql.streaming.StructuredNetworkWordCount") +.addToArgs(host, port.toString) +.endContainer() +.endSpec() +.done() + + val TIMEOUT = PatienceConfiguration.Timeout(Span(3, Minutes)) + Eventually.eventually(TIMEOUT, INTERVAL) { +assert(getRunLog.contains("spark-streaming-kube"), + "The application did not complete.") + } +} +finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents +.kubernetesClient +.services() +.inNamespace(kubernetesTestComponents.namespace) +.delete(driverService) + serverSocket.close() +} + } + + private def
[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r225686232 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest + +import java.net._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ +import org.apache.spark.internal.Logging +import org.apache.spark.util + +private[spark] trait StreamingCompatibilitySuite { + + k8sSuite: KubernetesSuite => + + import StreamingCompatibilitySuite._ + + test("Run spark streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +val driverService = driverServiceSetup +try { + setupSparkStreamingPod(driverService.getMetadata.getName) +.addToArgs("streaming.NetworkWordCount") +.addToArgs(host, port.toString) +.endContainer() +.endSpec() +.done() + Eventually.eventually(TIMEOUT, INTERVAL) { +assert(getRunLog.contains("spark-streaming-kube"), "The application did not complete.") + } +} finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents +.kubernetesClient +.services() +.inNamespace(kubernetesTestComponents.namespace) +.delete(driverService) + serverSocket.close() +} + } + + test("Run spark streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +mainClass = "org.apache.spark.examples.streaming.NetworkWordCount", +appArgs = Array[String](host, port.toString), +expectedJVMValue = Seq("spark-streaming-kube")) +} finally { + serverSocket.close() +} + } + + test("Run spark structured streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +mainClass = "org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount", +appArgs = Array[String](host, port.toString), +expectedJVMValue = Seq("spark-streaming-kube")) +} finally { + serverSocket.close() +} + } + + test("Run spark structured streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +val driverService = driverServiceSetup +try { + setupSparkStreamingPod(driverService.getMetadata.getName) +.addToArgs("sql.streaming.StructuredNetworkWordCount") +.addToArgs(host, port.toString) +.endContainer() +.endSpec() +.done() + + val TIMEOUT = PatienceConfiguration.Timeout(Span(3, Minutes)) + Eventually.eventually(TIMEOUT, INTERVAL) { +assert(getRunLog.contains("spark-streaming-kube"), + "The application did not complete.") + } +} +finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents +.kubernetesClient +.services() +.inNamespace(kubernetesTestComponents.namespace) +.delete(driverService) + serverSocket.close() +} + } + + private def
[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r225105051 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest + +import java.net._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ +import org.apache.spark.internal.Logging +import org.apache.spark.util + +private[spark] trait StreamingCompatibilitySuite { + + k8sSuite: KubernetesSuite => + + import StreamingCompatibilitySuite._ + + test("Run spark streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() --- End diff -- Please correct my understanding, a custom source has to either live in examples, or a separate image has to be published with the class path of the custom source. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r224451681 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest + +import java.net._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ +import org.apache.spark.internal.Logging +import org.apache.spark.util + +private[spark] trait StreamingCompatibilitySuite { + + k8sSuite: KubernetesSuite => + + import StreamingCompatibilitySuite._ + + test("Run spark streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() --- End diff -- We could use a custom source as an alternative for feeding the source. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r223519608 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala --- @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest + +import java.net._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import io.fabric8.kubernetes.api.model.Service +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.util + +private[spark] trait StreamingCompatibilitySuite { + + k8sSuite: KubernetesSuite => + + private def startSocketServer(): (String, Int, ServerSocket) = { +val hostname = util.Utils.localHostName() +val hostAddress: String = InetAddress.getByName(hostname).getHostAddress +val serverSocket = new ServerSocket() +serverSocket.bind(new InetSocketAddress(hostAddress, 0)) +val host = serverSocket.getInetAddress.getHostAddress +val port = serverSocket.getLocalPort +logInfo(s"Started test server socket at $host:$port") +Future { + while (!serverSocket.isClosed) { +val socket: Socket = serverSocket.accept() +logInfo(s"Received connection on $socket") +for (i <- 1 to 10 ) { + if (socket.isConnected && !serverSocket.isClosed) { +socket.getOutputStream.write("spark-streaming-kube test.\n".getBytes()) +socket.getOutputStream.flush() +Thread.sleep(100) + } +} +socket.close() + } +} +(host, port, serverSocket) + } + + test("Run spark streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +val (driverPort: Int, blockManagerPort: Int, driverService: Service) = + driverServiceSetup(driverPodName) +try { + val driverPod = setupSparkStreamingPod("client", driverPodName) +.addToArgs("--conf", s"spark.driver.host=" + + s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc") +.addToArgs("--conf", s"spark.driver.port=$driverPort") +.addToArgs("--conf", s"spark.driver.blockManager.port=$blockManagerPort") +.addToArgs("streaming.NetworkWordCount") +.addToArgs(host, port.toString) +.endContainer() +.endSpec() +.done() + + Eventually.eventually(TIMEOUT, INTERVAL) { +assert(getRunLog(driverPodName) + .contains("spark-streaming-kube"), "The application did not complete.") + } +} finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents +.kubernetesClient +.services() +.inNamespace(kubernetesTestComponents.namespace) +.delete(driverService) + serverSocket.close() +} + } + + test("Run spark streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +mainClass = "org.apache.spark.examples.streaming.NetworkWordCount", +appArgs = Array[String](host, port.toString), +expectedJVMValue = Seq("spark-streaming-kube")) +} finally { + serverSocket.close() +} + } + + test("Run spark structured streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +
[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r223519103 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala --- @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest + +import java.net._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import io.fabric8.kubernetes.api.model.Service +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.util + +private[spark] trait StreamingCompatibilitySuite { + + k8sSuite: KubernetesSuite => + + private def startSocketServer(): (String, Int, ServerSocket) = { +val hostname = util.Utils.localHostName() +val hostAddress: String = InetAddress.getByName(hostname).getHostAddress +val serverSocket = new ServerSocket() +serverSocket.bind(new InetSocketAddress(hostAddress, 0)) +val host = serverSocket.getInetAddress.getHostAddress +val port = serverSocket.getLocalPort +logInfo(s"Started test server socket at $host:$port") +Future { + while (!serverSocket.isClosed) { +val socket: Socket = serverSocket.accept() +logInfo(s"Received connection on $socket") +for (i <- 1 to 10 ) { + if (socket.isConnected && !serverSocket.isClosed) { +socket.getOutputStream.write("spark-streaming-kube test.\n".getBytes()) +socket.getOutputStream.flush() +Thread.sleep(100) + } +} +socket.close() + } +} +(host, port, serverSocket) + } + + test("Run spark streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +val (driverPort: Int, blockManagerPort: Int, driverService: Service) = + driverServiceSetup(driverPodName) +try { + val driverPod = setupSparkStreamingPod("client", driverPodName) +.addToArgs("--conf", s"spark.driver.host=" + + s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc") +.addToArgs("--conf", s"spark.driver.port=$driverPort") +.addToArgs("--conf", s"spark.driver.blockManager.port=$blockManagerPort") +.addToArgs("streaming.NetworkWordCount") +.addToArgs(host, port.toString) +.endContainer() +.endSpec() +.done() + + Eventually.eventually(TIMEOUT, INTERVAL) { +assert(getRunLog(driverPodName) + .contains("spark-streaming-kube"), "The application did not complete.") + } +} finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents +.kubernetesClient +.services() +.inNamespace(kubernetesTestComponents.namespace) +.delete(driverService) + serverSocket.close() +} + } + + test("Run spark streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +mainClass = "org.apache.spark.examples.streaming.NetworkWordCount", +appArgs = Array[String](host, port.toString), +expectedJVMValue = Seq("spark-streaming-kube")) +} finally { + serverSocket.close() +} + } + + test("Run spark structured streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +
[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r223519430 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala --- @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest + +import java.net._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import io.fabric8.kubernetes.api.model.Service +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.util + +private[spark] trait StreamingCompatibilitySuite { + + k8sSuite: KubernetesSuite => + + private def startSocketServer(): (String, Int, ServerSocket) = { +val hostname = util.Utils.localHostName() +val hostAddress: String = InetAddress.getByName(hostname).getHostAddress +val serverSocket = new ServerSocket() +serverSocket.bind(new InetSocketAddress(hostAddress, 0)) +val host = serverSocket.getInetAddress.getHostAddress +val port = serverSocket.getLocalPort +logInfo(s"Started test server socket at $host:$port") +Future { + while (!serverSocket.isClosed) { +val socket: Socket = serverSocket.accept() +logInfo(s"Received connection on $socket") +for (i <- 1 to 10 ) { + if (socket.isConnected && !serverSocket.isClosed) { +socket.getOutputStream.write("spark-streaming-kube test.\n".getBytes()) +socket.getOutputStream.flush() +Thread.sleep(100) + } +} +socket.close() + } +} +(host, port, serverSocket) + } + + test("Run spark streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +val (driverPort: Int, blockManagerPort: Int, driverService: Service) = + driverServiceSetup(driverPodName) +try { + val driverPod = setupSparkStreamingPod("client", driverPodName) +.addToArgs("--conf", s"spark.driver.host=" + + s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc") +.addToArgs("--conf", s"spark.driver.port=$driverPort") +.addToArgs("--conf", s"spark.driver.blockManager.port=$blockManagerPort") +.addToArgs("streaming.NetworkWordCount") +.addToArgs(host, port.toString) +.endContainer() +.endSpec() +.done() + + Eventually.eventually(TIMEOUT, INTERVAL) { +assert(getRunLog(driverPodName) + .contains("spark-streaming-kube"), "The application did not complete.") + } +} finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents +.kubernetesClient +.services() +.inNamespace(kubernetesTestComponents.namespace) +.delete(driverService) + serverSocket.close() +} + } + + test("Run spark streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +mainClass = "org.apache.spark.examples.streaming.NetworkWordCount", +appArgs = Array[String](host, port.toString), +expectedJVMValue = Seq("spark-streaming-kube")) +} finally { + serverSocket.close() +} + } + + test("Run spark structured streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +
[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r223519277 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala --- @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest + +import java.net._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import io.fabric8.kubernetes.api.model.Service +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.util + +private[spark] trait StreamingCompatibilitySuite { + + k8sSuite: KubernetesSuite => + + private def startSocketServer(): (String, Int, ServerSocket) = { +val hostname = util.Utils.localHostName() +val hostAddress: String = InetAddress.getByName(hostname).getHostAddress +val serverSocket = new ServerSocket() +serverSocket.bind(new InetSocketAddress(hostAddress, 0)) +val host = serverSocket.getInetAddress.getHostAddress +val port = serverSocket.getLocalPort +logInfo(s"Started test server socket at $host:$port") +Future { + while (!serverSocket.isClosed) { +val socket: Socket = serverSocket.accept() +logInfo(s"Received connection on $socket") +for (i <- 1 to 10 ) { + if (socket.isConnected && !serverSocket.isClosed) { +socket.getOutputStream.write("spark-streaming-kube test.\n".getBytes()) +socket.getOutputStream.flush() +Thread.sleep(100) + } +} +socket.close() + } +} +(host, port, serverSocket) + } + + test("Run spark streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +val (driverPort: Int, blockManagerPort: Int, driverService: Service) = + driverServiceSetup(driverPodName) +try { + val driverPod = setupSparkStreamingPod("client", driverPodName) +.addToArgs("--conf", s"spark.driver.host=" + + s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc") +.addToArgs("--conf", s"spark.driver.port=$driverPort") +.addToArgs("--conf", s"spark.driver.blockManager.port=$blockManagerPort") +.addToArgs("streaming.NetworkWordCount") +.addToArgs(host, port.toString) +.endContainer() +.endSpec() +.done() + + Eventually.eventually(TIMEOUT, INTERVAL) { +assert(getRunLog(driverPodName) + .contains("spark-streaming-kube"), "The application did not complete.") + } +} finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents +.kubernetesClient +.services() +.inNamespace(kubernetesTestComponents.namespace) +.delete(driverService) + serverSocket.close() +} + } + + test("Run spark streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +mainClass = "org.apache.spark.examples.streaming.NetworkWordCount", +appArgs = Array[String](host, port.toString), +expectedJVMValue = Seq("spark-streaming-kube")) +} finally { + serverSocket.close() +} + } + + test("Run spark structured streaming in cluster mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() +try { + runSparkJVMCheckAndVerifyCompletion( +
[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r223518841 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala --- @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest + +import java.net._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import io.fabric8.kubernetes.api.model.Service +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.util + +private[spark] trait StreamingCompatibilitySuite { + + k8sSuite: KubernetesSuite => + + private def startSocketServer(): (String, Int, ServerSocket) = { --- End diff -- If we do it that way it's a lot easier to clear the resources and avoid trouble with sockets hanging open on the Jenkins bare metal host, for example we can just delete the whole server pod. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r223518123 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala --- @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest + +import java.net._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import io.fabric8.kubernetes.api.model.Service +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.util + +private[spark] trait StreamingCompatibilitySuite { + + k8sSuite: KubernetesSuite => + + private def startSocketServer(): (String, Int, ServerSocket) = { --- End diff -- Is this the submission client starting a server, and then the pod needs to be able to connect to the submission client host and port? An alternative is to deploy a separate pod that does this so that network communications are pod-to-pod instead of pod-to-host. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r223518338 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala --- @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest + +import java.net._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import io.fabric8.kubernetes.api.model.Service +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.util + +private[spark] trait StreamingCompatibilitySuite { + + k8sSuite: KubernetesSuite => + + private def startSocketServer(): (String, Int, ServerSocket) = { +val hostname = util.Utils.localHostName() +val hostAddress: String = InetAddress.getByName(hostname).getHostAddress +val serverSocket = new ServerSocket() +serverSocket.bind(new InetSocketAddress(hostAddress, 0)) +val host = serverSocket.getInetAddress.getHostAddress +val port = serverSocket.getLocalPort +logInfo(s"Started test server socket at $host:$port") +Future { + while (!serverSocket.isClosed) { +val socket: Socket = serverSocket.accept() +logInfo(s"Received connection on $socket") +for (i <- 1 to 10 ) { + if (socket.isConnected && !serverSocket.isClosed) { +socket.getOutputStream.write("spark-streaming-kube test.\n".getBytes()) --- End diff -- Specify encoding as UTF-8. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r222942209 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala --- @@ -120,7 +120,7 @@ private[spark] object SparkAppLauncher extends Logging { appConf.toStringArray :+ appArguments.mainAppResource if (appArguments.appArgs.nonEmpty) { - commandLine += appArguments.appArgs.mkString(" ") + commandLine ++= appArguments.appArgs --- End diff -- Space separated single argument or, multiple different argument. If we do `.mkString(" ")` then, it takes multi arguments as space separated single argument. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
GitHub user ScrapCodes opened a pull request: https://github.com/apache/spark/pull/22639 [SPARK-25647][k8s] Add spark streaming compatibility suite for kubernetes. ## What changes were proposed in this pull request? Adds integration tests for spark streaming compatibility with K8s mode. ## How was this patch tested? By running the test suites. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ScrapCodes/spark stream-test-k8s Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22639.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22639 commit 0a4e222b29368b4ff1924c0635ed8274e9e69933 Author: Prashant Sharma Date: 2018-10-05T09:12:38Z [SPARK-25647][k8s] Add spark streaming compatibility suite for kubernetes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org