[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...

2018-10-30 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/22639#discussion_r229218791
  
--- Diff: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.net._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util
+
+private[spark] trait StreamingCompatibilitySuite {
+
+  k8sSuite: KubernetesSuite =>
+
+  import StreamingCompatibilitySuite._
+
+  test("Run spark streaming in client mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+val driverService = driverServiceSetup
+try {
+  setupSparkStreamingPod(driverService.getMetadata.getName)
+.addToArgs("streaming.NetworkWordCount")
+.addToArgs(host, port.toString)
+.endContainer()
+.endSpec()
+.done()
+  Eventually.eventually(TIMEOUT, INTERVAL) {
+assert(getRunLog.contains("spark-streaming-kube"), "The 
application did not complete.")
+  }
+} finally {
+  // Have to delete the service manually since it doesn't have an 
owner reference
+  kubernetesTestComponents
+.kubernetesClient
+.services()
+.inNamespace(kubernetesTestComponents.namespace)
+.delete(driverService)
+  serverSocket.close()
+}
+  }
+
+  test("Run spark streaming in cluster mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+try {
+  runSparkJVMCheckAndVerifyCompletion(
+mainClass = "org.apache.spark.examples.streaming.NetworkWordCount",
+appArgs = Array[String](host, port.toString),
+expectedJVMValue = Seq("spark-streaming-kube"))
+} finally {
+  serverSocket.close()
+}
+  }
+
+  test("Run spark structured streaming in cluster mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+try {
+  runSparkJVMCheckAndVerifyCompletion(
+mainClass = 
"org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount",
+appArgs = Array[String](host, port.toString),
+expectedJVMValue = Seq("spark-streaming-kube"))
+} finally {
+  serverSocket.close()
+}
+  }
+
+  test("Run spark structured streaming in client mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+val driverService = driverServiceSetup
+try {
+  setupSparkStreamingPod(driverService.getMetadata.getName)
+.addToArgs("sql.streaming.StructuredNetworkWordCount")
+.addToArgs(host, port.toString)
+.endContainer()
+.endSpec()
+.done()
+
+  val TIMEOUT = PatienceConfiguration.Timeout(Span(3, Minutes))
+  Eventually.eventually(TIMEOUT, INTERVAL) {
+assert(getRunLog.contains("spark-streaming-kube"),
+  "The application did not complete.")
+  }
+}
+finally {
+  // Have to delete the service manually since it doesn't have an 
owner reference
+  kubernetesTestComponents
+.kubernetesClient
+.services()
+.inNamespace(kubernetesTestComponents.namespace)
+.delete(driverService)
+  serverSocket.close()
+}
+  }
+
+  private def 

[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...

2018-10-22 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22639#discussion_r227060819
  
--- Diff: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.net._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util
+
+private[spark] trait StreamingCompatibilitySuite {
+
+  k8sSuite: KubernetesSuite =>
+
+  import StreamingCompatibilitySuite._
+
+  test("Run spark streaming in client mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+val driverService = driverServiceSetup
+try {
+  setupSparkStreamingPod(driverService.getMetadata.getName)
+.addToArgs("streaming.NetworkWordCount")
+.addToArgs(host, port.toString)
+.endContainer()
+.endSpec()
+.done()
+  Eventually.eventually(TIMEOUT, INTERVAL) {
+assert(getRunLog.contains("spark-streaming-kube"), "The 
application did not complete.")
+  }
+} finally {
+  // Have to delete the service manually since it doesn't have an 
owner reference
+  kubernetesTestComponents
+.kubernetesClient
+.services()
+.inNamespace(kubernetesTestComponents.namespace)
+.delete(driverService)
+  serverSocket.close()
+}
+  }
+
+  test("Run spark streaming in cluster mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+try {
+  runSparkJVMCheckAndVerifyCompletion(
+mainClass = "org.apache.spark.examples.streaming.NetworkWordCount",
+appArgs = Array[String](host, port.toString),
+expectedJVMValue = Seq("spark-streaming-kube"))
+} finally {
+  serverSocket.close()
+}
+  }
+
+  test("Run spark structured streaming in cluster mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+try {
+  runSparkJVMCheckAndVerifyCompletion(
+mainClass = 
"org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount",
+appArgs = Array[String](host, port.toString),
+expectedJVMValue = Seq("spark-streaming-kube"))
+} finally {
+  serverSocket.close()
+}
+  }
+
+  test("Run spark structured streaming in client mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+val driverService = driverServiceSetup
+try {
+  setupSparkStreamingPod(driverService.getMetadata.getName)
+.addToArgs("sql.streaming.StructuredNetworkWordCount")
+.addToArgs(host, port.toString)
+.endContainer()
+.endSpec()
+.done()
+
+  val TIMEOUT = PatienceConfiguration.Timeout(Span(3, Minutes))
+  Eventually.eventually(TIMEOUT, INTERVAL) {
+assert(getRunLog.contains("spark-streaming-kube"),
+  "The application did not complete.")
+  }
+}
+finally {
+  // Have to delete the service manually since it doesn't have an 
owner reference
+  kubernetesTestComponents
+.kubernetesClient
+.services()
+.inNamespace(kubernetesTestComponents.namespace)
+.delete(driverService)
+  serverSocket.close()
+}
+  }
+
+  private def 

[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...

2018-10-16 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/22639#discussion_r225686232
  
--- Diff: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.net._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util
+
+private[spark] trait StreamingCompatibilitySuite {
+
+  k8sSuite: KubernetesSuite =>
+
+  import StreamingCompatibilitySuite._
+
+  test("Run spark streaming in client mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+val driverService = driverServiceSetup
+try {
+  setupSparkStreamingPod(driverService.getMetadata.getName)
+.addToArgs("streaming.NetworkWordCount")
+.addToArgs(host, port.toString)
+.endContainer()
+.endSpec()
+.done()
+  Eventually.eventually(TIMEOUT, INTERVAL) {
+assert(getRunLog.contains("spark-streaming-kube"), "The 
application did not complete.")
+  }
+} finally {
+  // Have to delete the service manually since it doesn't have an 
owner reference
+  kubernetesTestComponents
+.kubernetesClient
+.services()
+.inNamespace(kubernetesTestComponents.namespace)
+.delete(driverService)
+  serverSocket.close()
+}
+  }
+
+  test("Run spark streaming in cluster mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+try {
+  runSparkJVMCheckAndVerifyCompletion(
+mainClass = "org.apache.spark.examples.streaming.NetworkWordCount",
+appArgs = Array[String](host, port.toString),
+expectedJVMValue = Seq("spark-streaming-kube"))
+} finally {
+  serverSocket.close()
+}
+  }
+
+  test("Run spark structured streaming in cluster mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+try {
+  runSparkJVMCheckAndVerifyCompletion(
+mainClass = 
"org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount",
+appArgs = Array[String](host, port.toString),
+expectedJVMValue = Seq("spark-streaming-kube"))
+} finally {
+  serverSocket.close()
+}
+  }
+
+  test("Run spark structured streaming in client mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+val driverService = driverServiceSetup
+try {
+  setupSparkStreamingPod(driverService.getMetadata.getName)
+.addToArgs("sql.streaming.StructuredNetworkWordCount")
+.addToArgs(host, port.toString)
+.endContainer()
+.endSpec()
+.done()
+
+  val TIMEOUT = PatienceConfiguration.Timeout(Span(3, Minutes))
+  Eventually.eventually(TIMEOUT, INTERVAL) {
+assert(getRunLog.contains("spark-streaming-kube"),
+  "The application did not complete.")
+  }
+}
+finally {
+  // Have to delete the service manually since it doesn't have an 
owner reference
+  kubernetesTestComponents
+.kubernetesClient
+.services()
+.inNamespace(kubernetesTestComponents.namespace)
+.delete(driverService)
+  serverSocket.close()
+}
+  }
+
+  private def 

[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...

2018-10-15 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/22639#discussion_r225105051
  
--- Diff: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.net._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util
+
+private[spark] trait StreamingCompatibilitySuite {
+
+  k8sSuite: KubernetesSuite =>
+
+  import StreamingCompatibilitySuite._
+
+  test("Run spark streaming in client mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
--- End diff --

Please correct my understanding, a custom source has to either live in 
examples, or a separate image has to be published with the class path of the 
custom source. 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...

2018-10-11 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22639#discussion_r224451681
  
--- Diff: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.net._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util
+
+private[spark] trait StreamingCompatibilitySuite {
+
+  k8sSuite: KubernetesSuite =>
+
+  import StreamingCompatibilitySuite._
+
+  test("Run spark streaming in client mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
--- End diff --

We could use a custom source as an alternative for feeding the source.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...

2018-10-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22639#discussion_r223519608
  
--- Diff: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala
 ---
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.net._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+import io.fabric8.kubernetes.api.model.Service
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+import org.apache.spark.util
+
+private[spark] trait StreamingCompatibilitySuite {
+
+  k8sSuite: KubernetesSuite =>
+
+  private def startSocketServer(): (String, Int, ServerSocket) = {
+val hostname = util.Utils.localHostName()
+val hostAddress: String = 
InetAddress.getByName(hostname).getHostAddress
+val serverSocket = new ServerSocket()
+serverSocket.bind(new InetSocketAddress(hostAddress, 0))
+val host = serverSocket.getInetAddress.getHostAddress
+val port = serverSocket.getLocalPort
+logInfo(s"Started test server socket at $host:$port")
+Future {
+  while (!serverSocket.isClosed) {
+val socket: Socket = serverSocket.accept()
+logInfo(s"Received connection on $socket")
+for (i <- 1 to 10 ) {
+  if (socket.isConnected && !serverSocket.isClosed) {
+socket.getOutputStream.write("spark-streaming-kube 
test.\n".getBytes())
+socket.getOutputStream.flush()
+Thread.sleep(100)
+  }
+}
+socket.close()
+  }
+}
+(host, port, serverSocket)
+  }
+
+  test("Run spark streaming in client mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+val (driverPort: Int, blockManagerPort: Int, driverService: Service) =
+  driverServiceSetup(driverPodName)
+try {
+  val driverPod = setupSparkStreamingPod("client", driverPodName)
+.addToArgs("--conf", s"spark.driver.host="
+  + 
s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc")
+.addToArgs("--conf", s"spark.driver.port=$driverPort")
+.addToArgs("--conf", 
s"spark.driver.blockManager.port=$blockManagerPort")
+.addToArgs("streaming.NetworkWordCount")
+.addToArgs(host, port.toString)
+.endContainer()
+.endSpec()
+.done()
+
+  Eventually.eventually(TIMEOUT, INTERVAL) {
+assert(getRunLog(driverPodName)
+  .contains("spark-streaming-kube"), "The application did not 
complete.")
+  }
+} finally {
+  // Have to delete the service manually since it doesn't have an 
owner reference
+  kubernetesTestComponents
+.kubernetesClient
+.services()
+.inNamespace(kubernetesTestComponents.namespace)
+.delete(driverService)
+  serverSocket.close()
+}
+  }
+
+  test("Run spark streaming in cluster mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+try {
+  runSparkJVMCheckAndVerifyCompletion(
+mainClass = "org.apache.spark.examples.streaming.NetworkWordCount",
+appArgs = Array[String](host, port.toString),
+expectedJVMValue = Seq("spark-streaming-kube"))
+} finally {
+  serverSocket.close()
+}
+  }
+
+  test("Run spark structured streaming in cluster mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+try {
+  runSparkJVMCheckAndVerifyCompletion(
+

[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...

2018-10-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22639#discussion_r223519103
  
--- Diff: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala
 ---
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.net._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+import io.fabric8.kubernetes.api.model.Service
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+import org.apache.spark.util
+
+private[spark] trait StreamingCompatibilitySuite {
+
+  k8sSuite: KubernetesSuite =>
+
+  private def startSocketServer(): (String, Int, ServerSocket) = {
+val hostname = util.Utils.localHostName()
+val hostAddress: String = 
InetAddress.getByName(hostname).getHostAddress
+val serverSocket = new ServerSocket()
+serverSocket.bind(new InetSocketAddress(hostAddress, 0))
+val host = serverSocket.getInetAddress.getHostAddress
+val port = serverSocket.getLocalPort
+logInfo(s"Started test server socket at $host:$port")
+Future {
+  while (!serverSocket.isClosed) {
+val socket: Socket = serverSocket.accept()
+logInfo(s"Received connection on $socket")
+for (i <- 1 to 10 ) {
+  if (socket.isConnected && !serverSocket.isClosed) {
+socket.getOutputStream.write("spark-streaming-kube 
test.\n".getBytes())
+socket.getOutputStream.flush()
+Thread.sleep(100)
+  }
+}
+socket.close()
+  }
+}
+(host, port, serverSocket)
+  }
+
+  test("Run spark streaming in client mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+val (driverPort: Int, blockManagerPort: Int, driverService: Service) =
+  driverServiceSetup(driverPodName)
+try {
+  val driverPod = setupSparkStreamingPod("client", driverPodName)
+.addToArgs("--conf", s"spark.driver.host="
+  + 
s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc")
+.addToArgs("--conf", s"spark.driver.port=$driverPort")
+.addToArgs("--conf", 
s"spark.driver.blockManager.port=$blockManagerPort")
+.addToArgs("streaming.NetworkWordCount")
+.addToArgs(host, port.toString)
+.endContainer()
+.endSpec()
+.done()
+
+  Eventually.eventually(TIMEOUT, INTERVAL) {
+assert(getRunLog(driverPodName)
+  .contains("spark-streaming-kube"), "The application did not 
complete.")
+  }
+} finally {
+  // Have to delete the service manually since it doesn't have an 
owner reference
+  kubernetesTestComponents
+.kubernetesClient
+.services()
+.inNamespace(kubernetesTestComponents.namespace)
+.delete(driverService)
+  serverSocket.close()
+}
+  }
+
+  test("Run spark streaming in cluster mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+try {
+  runSparkJVMCheckAndVerifyCompletion(
+mainClass = "org.apache.spark.examples.streaming.NetworkWordCount",
+appArgs = Array[String](host, port.toString),
+expectedJVMValue = Seq("spark-streaming-kube"))
+} finally {
+  serverSocket.close()
+}
+  }
+
+  test("Run spark structured streaming in cluster mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+try {
+  runSparkJVMCheckAndVerifyCompletion(
+

[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...

2018-10-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22639#discussion_r223519430
  
--- Diff: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala
 ---
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.net._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+import io.fabric8.kubernetes.api.model.Service
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+import org.apache.spark.util
+
+private[spark] trait StreamingCompatibilitySuite {
+
+  k8sSuite: KubernetesSuite =>
+
+  private def startSocketServer(): (String, Int, ServerSocket) = {
+val hostname = util.Utils.localHostName()
+val hostAddress: String = 
InetAddress.getByName(hostname).getHostAddress
+val serverSocket = new ServerSocket()
+serverSocket.bind(new InetSocketAddress(hostAddress, 0))
+val host = serverSocket.getInetAddress.getHostAddress
+val port = serverSocket.getLocalPort
+logInfo(s"Started test server socket at $host:$port")
+Future {
+  while (!serverSocket.isClosed) {
+val socket: Socket = serverSocket.accept()
+logInfo(s"Received connection on $socket")
+for (i <- 1 to 10 ) {
+  if (socket.isConnected && !serverSocket.isClosed) {
+socket.getOutputStream.write("spark-streaming-kube 
test.\n".getBytes())
+socket.getOutputStream.flush()
+Thread.sleep(100)
+  }
+}
+socket.close()
+  }
+}
+(host, port, serverSocket)
+  }
+
+  test("Run spark streaming in client mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+val (driverPort: Int, blockManagerPort: Int, driverService: Service) =
+  driverServiceSetup(driverPodName)
+try {
+  val driverPod = setupSparkStreamingPod("client", driverPodName)
+.addToArgs("--conf", s"spark.driver.host="
+  + 
s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc")
+.addToArgs("--conf", s"spark.driver.port=$driverPort")
+.addToArgs("--conf", 
s"spark.driver.blockManager.port=$blockManagerPort")
+.addToArgs("streaming.NetworkWordCount")
+.addToArgs(host, port.toString)
+.endContainer()
+.endSpec()
+.done()
+
+  Eventually.eventually(TIMEOUT, INTERVAL) {
+assert(getRunLog(driverPodName)
+  .contains("spark-streaming-kube"), "The application did not 
complete.")
+  }
+} finally {
+  // Have to delete the service manually since it doesn't have an 
owner reference
+  kubernetesTestComponents
+.kubernetesClient
+.services()
+.inNamespace(kubernetesTestComponents.namespace)
+.delete(driverService)
+  serverSocket.close()
+}
+  }
+
+  test("Run spark streaming in cluster mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+try {
+  runSparkJVMCheckAndVerifyCompletion(
+mainClass = "org.apache.spark.examples.streaming.NetworkWordCount",
+appArgs = Array[String](host, port.toString),
+expectedJVMValue = Seq("spark-streaming-kube"))
+} finally {
+  serverSocket.close()
+}
+  }
+
+  test("Run spark structured streaming in cluster mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+try {
+  runSparkJVMCheckAndVerifyCompletion(
+

[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...

2018-10-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22639#discussion_r223519277
  
--- Diff: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala
 ---
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.net._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+import io.fabric8.kubernetes.api.model.Service
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+import org.apache.spark.util
+
+private[spark] trait StreamingCompatibilitySuite {
+
+  k8sSuite: KubernetesSuite =>
+
+  private def startSocketServer(): (String, Int, ServerSocket) = {
+val hostname = util.Utils.localHostName()
+val hostAddress: String = 
InetAddress.getByName(hostname).getHostAddress
+val serverSocket = new ServerSocket()
+serverSocket.bind(new InetSocketAddress(hostAddress, 0))
+val host = serverSocket.getInetAddress.getHostAddress
+val port = serverSocket.getLocalPort
+logInfo(s"Started test server socket at $host:$port")
+Future {
+  while (!serverSocket.isClosed) {
+val socket: Socket = serverSocket.accept()
+logInfo(s"Received connection on $socket")
+for (i <- 1 to 10 ) {
+  if (socket.isConnected && !serverSocket.isClosed) {
+socket.getOutputStream.write("spark-streaming-kube 
test.\n".getBytes())
+socket.getOutputStream.flush()
+Thread.sleep(100)
+  }
+}
+socket.close()
+  }
+}
+(host, port, serverSocket)
+  }
+
+  test("Run spark streaming in client mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+val (driverPort: Int, blockManagerPort: Int, driverService: Service) =
+  driverServiceSetup(driverPodName)
+try {
+  val driverPod = setupSparkStreamingPod("client", driverPodName)
+.addToArgs("--conf", s"spark.driver.host="
+  + 
s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc")
+.addToArgs("--conf", s"spark.driver.port=$driverPort")
+.addToArgs("--conf", 
s"spark.driver.blockManager.port=$blockManagerPort")
+.addToArgs("streaming.NetworkWordCount")
+.addToArgs(host, port.toString)
+.endContainer()
+.endSpec()
+.done()
+
+  Eventually.eventually(TIMEOUT, INTERVAL) {
+assert(getRunLog(driverPodName)
+  .contains("spark-streaming-kube"), "The application did not 
complete.")
+  }
+} finally {
+  // Have to delete the service manually since it doesn't have an 
owner reference
+  kubernetesTestComponents
+.kubernetesClient
+.services()
+.inNamespace(kubernetesTestComponents.namespace)
+.delete(driverService)
+  serverSocket.close()
+}
+  }
+
+  test("Run spark streaming in cluster mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+try {
+  runSparkJVMCheckAndVerifyCompletion(
+mainClass = "org.apache.spark.examples.streaming.NetworkWordCount",
+appArgs = Array[String](host, port.toString),
+expectedJVMValue = Seq("spark-streaming-kube"))
+} finally {
+  serverSocket.close()
+}
+  }
+
+  test("Run spark structured streaming in cluster mode.", k8sTestTag) {
+val (host, port, serverSocket) = startSocketServer()
+try {
+  runSparkJVMCheckAndVerifyCompletion(
+

[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...

2018-10-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22639#discussion_r223518841
  
--- Diff: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala
 ---
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.net._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+import io.fabric8.kubernetes.api.model.Service
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+import org.apache.spark.util
+
+private[spark] trait StreamingCompatibilitySuite {
+
+  k8sSuite: KubernetesSuite =>
+
+  private def startSocketServer(): (String, Int, ServerSocket) = {
--- End diff --

If we do it that way it's a lot easier to clear the resources and avoid 
trouble with sockets hanging open on the Jenkins bare metal host, for example 
we can just delete the whole server pod.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...

2018-10-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22639#discussion_r223518123
  
--- Diff: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala
 ---
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.net._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+import io.fabric8.kubernetes.api.model.Service
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+import org.apache.spark.util
+
+private[spark] trait StreamingCompatibilitySuite {
+
+  k8sSuite: KubernetesSuite =>
+
+  private def startSocketServer(): (String, Int, ServerSocket) = {
--- End diff --

Is this the submission client starting a server, and then the pod needs to 
be able to connect to the submission client host and port? An alternative is to 
deploy a separate pod that does this so that network communications are 
pod-to-pod instead of pod-to-host.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...

2018-10-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22639#discussion_r223518338
  
--- Diff: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala
 ---
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.net._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+import io.fabric8.kubernetes.api.model.Service
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+import org.apache.spark.util
+
+private[spark] trait StreamingCompatibilitySuite {
+
+  k8sSuite: KubernetesSuite =>
+
+  private def startSocketServer(): (String, Int, ServerSocket) = {
+val hostname = util.Utils.localHostName()
+val hostAddress: String = 
InetAddress.getByName(hostname).getHostAddress
+val serverSocket = new ServerSocket()
+serverSocket.bind(new InetSocketAddress(hostAddress, 0))
+val host = serverSocket.getInetAddress.getHostAddress
+val port = serverSocket.getLocalPort
+logInfo(s"Started test server socket at $host:$port")
+Future {
+  while (!serverSocket.isClosed) {
+val socket: Socket = serverSocket.accept()
+logInfo(s"Received connection on $socket")
+for (i <- 1 to 10 ) {
+  if (socket.isConnected && !serverSocket.isClosed) {
+socket.getOutputStream.write("spark-streaming-kube 
test.\n".getBytes())
--- End diff --

Specify encoding as UTF-8.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...

2018-10-05 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/22639#discussion_r222942209
  
--- Diff: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
 ---
@@ -120,7 +120,7 @@ private[spark] object SparkAppLauncher extends Logging {
 appConf.toStringArray :+ appArguments.mainAppResource
 
 if (appArguments.appArgs.nonEmpty) {
-  commandLine += appArguments.appArgs.mkString(" ")
+  commandLine ++= appArguments.appArgs
--- End diff --

Space separated single argument or, multiple different argument. If we do 
`.mkString(" ")` then, it takes multi arguments as space separated single 
argument.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...

2018-10-05 Thread ScrapCodes
GitHub user ScrapCodes opened a pull request:

https://github.com/apache/spark/pull/22639

[SPARK-25647][k8s] Add spark streaming compatibility suite for kubernetes.

## What changes were proposed in this pull request?

Adds integration tests for spark streaming compatibility with K8s mode.

## How was this patch tested?

By running the test suites.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ScrapCodes/spark stream-test-k8s

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22639.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22639


commit 0a4e222b29368b4ff1924c0635ed8274e9e69933
Author: Prashant Sharma 
Date:   2018-10-05T09:12:38Z

[SPARK-25647][k8s] Add spark streaming compatibility suite for kubernetes.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org