[GitHub] spark pull request #23026: [SPARK-25960][k8s] Support subpath mounting with ...
GitHub user NiharS opened a pull request: https://github.com/apache/spark/pull/23026 [SPARK-25960][k8s] Support subpath mounting with Kubernetes ## What changes were proposed in this pull request? This PR adds configurations to use subpaths with Spark on k8s. Subpaths (https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath) allow the user to specify a path within a volume to use instead of the volume's root. ## How was this patch tested? Added unit tests. Ran SparkPi on a cluster with event logging pointed at a subpath-mount and verified the driver host created and used the subpath. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NiharS/spark k8s_subpath Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23026.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23026 commit e820aa6feefedb2d59d346e83385605b0f4fe1b4 Author: Nihar Sheth Date: 2018-11-13T21:25:47Z subpath code and tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Yarn...
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r223512824 --- Diff: core/src/main/scala/org/apache/spark/internal/Logging.scala --- @@ -192,7 +211,15 @@ private[spark] object Logging { defaultSparkLog4jConfig = false LogManager.resetConfiguration() } else { -LogManager.getRootLogger().setLevel(defaultRootLevel) +val rootLogger = LogManager.getRootLogger() +rootLogger.setLevel(defaultRootLevel) +rootLogger.getAllAppenders().asScala.foreach { tmp => --- End diff -- Any reason not to just iterate through `consoleAppenderToThreshold.keys()`? Not a huge deal but cuts down on a bit of work --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22539: detect date type in csv file
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22539 Could you edit your title to include the jira number and component? e.g. [SPARK-25517][Core] Detect ... Helps with bookkeeping, plus it'll add a link to the jira so people can see your PR from there. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 Hi, just want to follow up on this and see if anyone has additional comments/questions/issues. Please feel free to ping me and I'll respond as soon as I can! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 I feel like it should be unrelated as well. It's strange that I failed tests in the same suite twice in a row, and that no other recent build has failed that suite, but I tried running locally on a mac and a linux vm and couldn't reproduce it so...fingers crossed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 Logs suddenly cut off again without any exceptions, don't think this one is a code error as well. Could someone retest this please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r216210046 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -136,6 +136,26 @@ private[spark] class Executor( // for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too. env.serializerManager.setDefaultClassLoader(replClassLoader) + private val executorPlugins: Seq[ExecutorPlugin] = { +val pluginNames = conf.get(EXECUTOR_PLUGINS) +if (pluginNames.nonEmpty) { + logDebug(s"Initializing the following plugins: ${pluginNames.mkString(", ")}") + + // Plugins need to load using a class loader that includes the executor's user classpath + val pluginList: Seq[ExecutorPlugin] = +Utils.withContextClassLoader(replClassLoader) { + val plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginNames, conf) + plugins.foreach(_.init()) --- End diff -- I think that should be the right behavior, since if a plugin is faulty it might be a good idea for the user to handle that issue first. Since plugins are opt-in, a user providing one would expect it to function once it's included, and would probably want to know if it fails rather than run their entire job just to see that it didn't do what it was supposed to do --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r216209462 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -240,6 +240,19 @@ private[spark] object Utils extends Logging { // scalastyle:on classforname } + /** + * Run a segment of code using a different context class loader in the current thread + */ + def withContextClassLoader[T](ctxClassLoader: ClassLoader)(fn: => T): T = { +val oldClassLoader = getContextOrSparkClassLoader --- End diff -- Originally thought it might be a better idea since the spark executor uses that method when it runs but now that you mention it it could potentially change the behavior...I'll change it back --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r216112535 --- Diff: core/src/test/java/org/apache/spark/ExecutorPluginSuite.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.spark.api.java.JavaSparkContext; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class ExecutorPluginSuite { + private static final String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins"; + private static final String testBadPluginName = TestBadShutdownPlugin.class.getName(); + private static final String testPluginName = TestExecutorPlugin.class.getName(); + + // Static value modified by testing plugin to ensure plugin loaded correctly. + public static int numSuccessfulPlugins = 0; + + // Static value modified by testing plugin to verify plugins shut down properly. + public static int numSuccessfulTerminations = 0; + + private JavaSparkContext sc; + + @Before + public void setUp() { +sc = null; +numSuccessfulPlugins = 0; +numSuccessfulTerminations = 0; + } + + @After + public void tearDown() { +if (sc != null) { + sc.stop(); + sc = null; +} + } + + private SparkConf initializeSparkConf(String pluginNames) { +return new SparkConf() +.setMaster("local") +.setAppName("test") +.set(EXECUTOR_PLUGIN_CONF_NAME, pluginNames); + } + + @Test + public void testPluginClassDoesNotExist() { +SparkConf conf = initializeSparkConf("nonexistant.plugin"); +try { + sc = new JavaSparkContext(conf); +} catch (Exception e) { + // We cannot catch ClassNotFoundException directly because Java doesn't think it'll be thrown + assertTrue(e.toString().startsWith("java.lang.ClassNotFoundException")); +} + } + + @Test + public void testAddPlugin() throws InterruptedException { +// Load the sample TestExecutorPlugin, which will change the value of numSuccessfulPlugins +SparkConf conf = initializeSparkConf(testPluginName); +sc = new JavaSparkContext(conf); +assertEquals(1, numSuccessfulPlugins); +sc.stop(); +sc = null; +assertEquals(1, numSuccessfulTerminations); + } + + @Test + public void testAddMultiplePlugins() throws InterruptedException { +// Load the sample TestExecutorPlugin twice +SparkConf conf = initializeSparkConf(testPluginName + "," + testPluginName); +sc = new JavaSparkContext(conf); +assertEquals(2, numSuccessfulPlugins); +sc.stop(); +sc = null; +assertEquals(2, numSuccessfulTerminations); + } + + @Test + public void testPluginShutdownWithException() { +// Verify an exception in one plugin shutdown does not affect the others +String pluginNames = testPluginName + "," + testBadPluginName + "," + testPluginName; +SparkConf conf = initializeSparkConf(pluginNames); +sc = new JavaSparkContext(conf); +assertEquals(2, numSuccessfulPlugins); --- End diff -- The bad plugin doesn't increment numSuccessfulPlugins, although in retrospect it's probably a good idea that it does... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 If I want to add more testing plugins (i.e. to test that a plugin failing to shut down doesn't affect other plugins), is it stylistically ok for me to keep throwing them at the bottom of ExecutorPluginSuite? Or should I make a separate file (or multiple) for them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r216053464 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -219,6 +236,13 @@ private[spark] class Executor( heartbeater.shutdown() heartbeater.awaitTermination(10, TimeUnit.SECONDS) threadPool.shutdown() + +// Notify plugins that executor is shutting down so they can terminate cleanly +// oldContextClassLoader is redefined in case the class loader changed during execution +val oldContextClassLoader = Utils.getContextOrSparkClassLoader --- End diff -- Do you mean make a function that does something like `Utils.withClassLoader(replClassLoader) { load plugins }` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 ^This commit is to show what that looks like. I'm still looking into how to properly barrier the plugins after calling `init` from a new thread (since `join`ing the thread requires the thread to die first, at which point it can no longer listen for executor `stop`ing) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 I tested @vanzin's suggestion of storing the classloader, changing it to replClassLoader, and changing back after init (and then changing again for shutdown). It's working well (and this time I made sure it doesn't just shut down immediately; I ran on a cluster and confirmed it runs the init code and creates a thread, the thread stays alive for the duration of the executor, and the shutdown code runs fully before exiting). While it looks a little weird, the code is synchronous and shouldn't run into an issue with the classloader being set to something it shouldn't be set to while executing the existing Spark code, and ensures the plugins initialize fully during executor initialization. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r215760787 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -218,6 +244,8 @@ private[spark] class Executor( env.metricsSystem.report() heartbeater.shutdown() heartbeater.awaitTermination(10, TimeUnit.SECONDS) +executorPluginThread.interrupt() +executorPluginThread.join() --- End diff -- Oops, yeah, my bad. I believe the current implementation also terminates the plugins immediately... For your recommendation, would the thread that initializes the plugins also be undergoing the conditional wait? If so, would that mean I cannot join on this thread right after initialization of plugins (from your earlier comment) since the thread would stay alive to wait for the shutdown condition? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 @mridulm I'm inclined to agree. I was doing some extra testing on separate thread and it seems to be performing well. I'll push this shortly. I'm going to also test setting the class loader and then changing back, since I'll always be partial towards synchronous operations, but separate thread is certainly seeming like a good policy given the pyspark failures. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 I'll change the config documentation to specify YARN only, hopefully it's not a huge issue. It seems like the line `Thread.currentThread().setContextClassLoader(replClassLoader)` is causing the pyspark failures, they pass when I remove it. I'm looking at the test cases but I really don't see how this is affecting them...it seems that in both test cases, the DStreams monitor a directory but don't pick up the changes they're supposed to, and just time out. I checked that I can bypass this issue by changing back to having the plugins loaded on a separate thread (and setting that thread's contextClassLoader instead of the current thread) and it passes tests and continues to work. That said this issue does seem to be indicative of some problem in pyspark streaming --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 You're right, I ran in local-cluster and it exited very quickly citing executors shutting down after not being able to find my test plugin. Although, the logs say that it does use a CoarseGrainedExecutorBackend: `18/09/05 12:03:20 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@nihar-xxx:45767` unless you mean Yarn only uses that one command line option. I'm looking into how it reacts in regular standalone mode with different thread counts. Thanks for pointing this out! I'm looking up other spark-submit options that can be used to provide the jar to other nodes, but I'm not super hopeful it's going to work out. If it indeed doesn't, I'll start exploring other options once I figure out why the pyspark tests are failing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 I believe this is another glitch, not from my changes. Could someone retest this please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 Tried on a different machine and those tests pass, here's hoping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 These tests are failing locally for me, both with my code and without on a clean pull of master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 Can't be a coincidence...it's failing two pyspark streaming tests, test_binary_records_stream and test_text_file_stream (same two earlier, I misanalyzed it when I said it was ML stuff). I'm running them locally to try to reproduce, though not sure how they relate to my changes. Nonetheless other PRs are building fine right now so it must be something in my changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r214418189 --- Diff: core/src/main/java/org/apache/spark/ExecutorPlugin.java --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.spark.annotation.DeveloperApi; + +/** + * A plugin which can be automaticaly instantiated within each Spark executor. Users can specify + * plugins which should be created with the "spark.executor.plugins" configuration. An instance + * of each plugin will be created for every executor, including those created by dynamic allocation, + * before the executor starts running any tasks. + * + * The specific api exposed to the end users still considered to be very unstable. We will + * *hopefully* be able to keep compatability by providing default implementations for any methods + * added, but make no guarantees this will always be possible across all spark releases. + * + * Spark does nothing to verify the plugin is doing legitimate things, or to manage the resources + * it uses. A plugin acquires the same privileges as the user running the task. A bad plugin + * could also intefere with task execution and make the executor fail in unexpected ways. + */ +@DeveloperApi +public interface ExecutorPlugin { --- End diff -- Both your arguments make sense, I can see a clear purpose for having a `stop`, given that we mostly care about it for successful runs without any weird early failures/terminations. The only case I can make for `start` (if we already have `init`) is if Spark itself calls the `start` methods from a separate thread instead of having plugin writers do it, but it might actually give them more flexibility to do it themselves. Having `start`/`stop` might be better than `init`/`stop`, though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 I'm going to commit the fixes recommended by @vanzin soon. Then, I think the biggest remaining concern is the threads. I'm okay with adding a `start` and `stop`, but I'm curious as to whether those should potentially be in a separate Jira. This could potentially go into 2.4, and more methods could be added as we learn the use cases for plugins. My concern with adding them is that if it's decided they're obsolete or repetitive (in the case of `init` vs `start`), but people have already started using some plugins, it'd be annoying to take away the methods they expect to work. My best argument for adding the methods is we could try to ensure people conform to a similar structure if we have, say, the `init` function run in the same thread as the executor, and then once the executors have initialized we could create a thread that `start`s the plugins so really plugin writers wouldn't have to even worry about making their plugins async. Does anyone have strong opinions? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r213837582 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -130,6 +130,14 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) + // Load plugins in the current thread, they are expected to not block. + // Heavy computation in plugin initialization should be done async. + Thread.currentThread().setContextClassLoader(replClassLoader) + conf.get(EXECUTOR_PLUGINS).foreach { classes => --- End diff -- Ah, whoops, misunderstood the value of `conf.get`. That works just fine --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r213831980 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -130,6 +130,14 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) + // Load plugins in the current thread, they are expected to not block. + // Heavy computation in plugin initialization should be done async. + Thread.currentThread().setContextClassLoader(replClassLoader) + conf.get(EXECUTOR_PLUGINS).foreach { classes => --- End diff -- @mridulm We've had some back and forth on that, open to changing it if people agree it should be one way over another. Just checking, have you looked at the earlier conversation at https://github.com/apache/spark/pull/22192#discussion_r212746543? There's also one on the old PR for the same topic. As for moving plugins for after executors have initialized, I see no problem with that. I don't think it should make a huge difference provided either we do change to separate thread, or we keep it in the same thread but writers respect not making the initialization blocking and computation heavy. I'll see if there's a more natural place to move it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r213828422 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -130,6 +130,14 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) + // Load plugins in the current thread, they are expected to not block. + // Heavy computation in plugin initialization should be done async. + Thread.currentThread().setContextClassLoader(replClassLoader) + conf.get(EXECUTOR_PLUGINS).foreach { classes => --- End diff -- @vanzin If I'm understanding correctly, in the ConfigBuilder I am to change `.createOptional` to `.createWithDefault(Nil)`. This gives the error: `[error] /Users/nsheth/personal_fork/spark/core/src/main/scala/org/apache/spark/executor/Executor.scala:136: type mismatch; [error] found : String [error] required: Seq[String] [error] Utils.loadExtensions(classOf[ExecutorPlugin], classes, conf)` Granted I have no idea why it's able to cast a Nil to a string for this config. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r213820297 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -130,6 +130,14 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) + // Load plugins in the current thread, they are expected to not block. + // Heavy computation in plugin initialization should be done async. + Thread.currentThread().setContextClassLoader(replClassLoader) + conf.get(EXECUTOR_PLUGINS).foreach { classes => --- End diff -- `Utils.loadExtensions` expects `classes` to be a sequence of strings, it gets mad at me if I add `.createWithDefault(Nil)` to the config --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 I don't think this test failure was caused by my changes, it failed a ML test in python due to a worker crashing. Couldn't find anything in the logs indicating that my changes led to this issue (also I don't think I changed any functionality from my previous commit, which passed all tests). Could someone retest this please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r213150133 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -130,6 +130,16 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) + // One thread will handle loading all of the plugins on this executor --- End diff -- That does make sense. While I did say "aside from semantics", semantics is a good reason to include it. Especially since it'll be harder to get plugin writers to adopt an `init` function later. I'll make the other changes and make sure the tests still pass, if anyone does feel strongly (or even weakly) on one way over another I don't think there's much harm in either approach. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r213140764 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -130,6 +130,16 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) + // One thread will handle loading all of the plugins on this executor --- End diff -- Aside from semantics, would an `init` method be necessary instead of having the initialization logic be in the plugin's constructor? Since the class loader is going to call the constructor immediately, I figure having an `init` function would only really make a difference if we want to load the plugins right here, and then call `init` at a later point in the executor's creation. I can't think of any particular reason why we'd want to do that, unless there's specific executor structures that we want created prior to plugin initialization (although in that case we could also just move the plugin initialization further down) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r212781171 --- Diff: core/src/test/java/org/apache/spark/ExecutorPluginSuite.java --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.spark.api.java.JavaSparkContext; + +import org.junit.Assert; +import org.junit.Test; + +// Tests loading plugins into executors +public class ExecutorPluginSuite { + // Static value modified by testing plugin to ensure plugin loaded correctly. + public static int numSuccessfulPlugins = 0; + + private SparkConf initializeSparkConf(String pluginNames) { +return new SparkConf() +.setMaster("local") +.setAppName("test") +.set("spark.executor.plugins", pluginNames); --- End diff -- I had the same first response. The original executor plugin code was a class in Java, so I figured I should keep my test in Java as well (manually checked beforehand, couldn't find any instances of Java stuff being primarily tested in Scala so I figured it best not to do that). Perhaps it's for the plugin-writers to get to choose their plugin language, since it's easier to extend a Java interface in Scala, than to extend a Scala trait in Java? I'll leave it as is for the time being --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r212780252 --- Diff: core/src/test/java/org/apache/spark/ExecutorPluginSuite.java --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.spark.api.java.JavaSparkContext; + +import org.junit.Assert; +import org.junit.Test; + +// Tests loading plugins into executors +public class ExecutorPluginSuite { + // Static value modified by testing plugin to ensure plugin loaded correctly. + public static int numSuccessfulPlugins = 0; + + private SparkConf initializeSparkConf(String pluginNames) { +return new SparkConf() +.setMaster("local") +.setAppName("test") +.set("spark.executor.plugins", pluginNames); --- End diff -- Is it alright to use Seq in a Java file? I couldn't find it in any other java files in Spark that aren't in a target/ directory. I don't think the compiler is getting mad when I import it but want to make sure I'm sticking with the usual style rules --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r212778867 --- Diff: core/src/test/java/org/apache/spark/ExecutorPluginSuite.java --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.spark.api.java.JavaSparkContext; + +import org.junit.Assert; +import org.junit.Test; + +// Tests loading plugins into executors +public class ExecutorPluginSuite { + // Static value modified by testing plugin to ensure plugin loaded correctly. + public static int numSuccessfulPlugins = 0; + + private SparkConf initializeSparkConf(String pluginNames) { +return new SparkConf() +.setMaster("local") +.setAppName("test") +.set("spark.executor.plugins", pluginNames); --- End diff -- Sorry, could you explain this for me? Do you mean instead of `.set("spark.executor.plugins", pluginNames)` I should a) have a variable storing the config name and b) pass the plugin names to initializeSparkConf as a list, and have it as `.set(EXECUTOR_PLUGIN_CONF_NAME, String.join(",", pluginNames)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r212777617 --- Diff: core/src/main/java/org/apache/spark/ExecutorPlugin.java --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.spark.annotation.DeveloperApi; + +/** + * A plugin which can be automaticaly instantiated within each Spark executor. Users can specify + * plugins which should be created with the "spark.executor.plugins" configuration. An instance + * of each plugin will be created for every executor, including those created by dynamic allocation, + * before the executor starts running any tasks. + * + * The specific api exposed to the end users still considered to be very unstable. We will + * *hopefully* be able to keep compatability by providing default implementations for any methods + * added, but make no guarantees this will always be possible across all spark releases. + * + * Spark does nothing to verify the plugin is doing legitimate things, or to manage the resources + * it uses. A plugin acquires the same privileges as the user running the task. A bad plugin + * could also intefere with task execution and make the executor fail in unexpected ways. + */ +@DeveloperApi +public interface ExecutorPlugin { --- End diff -- Pinging @squito for this one, not sure if he specifically wanted an empty API to start (and have plugins run to their own completion or until the JVM cleans them up) or was considering adding methods later --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r212777267 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -130,6 +130,16 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) + // One thread will handle loading all of the plugins on this executor --- End diff -- Originally was all in one thread, changed to separate based on this discussion: https://github.com/apache/spark/pull/21923#discussion_r206905166 I see the merits of both arguments, can change it based on the general consensus. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 (to my understanding this issue isn't part of my change, I checked other pulls that had this error around the same time and those also had similar outputs and lack of specific errors) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22192 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918] Executor Plugin API
GitHub user NiharS opened a pull request: https://github.com/apache/spark/pull/22192 [SPARK-24918] Executor Plugin API A continuation of @squito's executor plugin task. By his request I took his code and added testing and moved the plugin initialization to a separate thread. ## What changes were proposed in this pull request? Executor plugins now run on one separate thread, so the executor does not wait on them. Added testing. ## How was this patch tested? Added test cases that test using a sample plugin. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NiharS/spark executorPlugin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22192.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22192 commit 44454dd586e35bdf16492c4a8969494bd3b7f8f5 Author: Nihar Sheth Date: 2018-08-20T17:53:37Z [SPARK-24918] Executor Plugin API This commit adds testing and moves the plugin initialization to a separate thread. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22114: [SPARK-24938][Core] Prevent Netty from using onheap memo...
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22114 They pass on my machine :( --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22114: [SPARK-24938][Core] Prevent Netty from using onheap memo...
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22114 Tried with a significantly larger input, both with and without the change. They ran in just about the same time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22114: [SPARK-24938][Core] Prevent Netty from using onheap memo...
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/22114 Working on a better test now! With my small program there was definitely no noticeable change in runtime. The memory monitor was running too, though, so I did have some external factors. I'll try with a significantly larger input and see if there's anything that stands out --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22114: [SPARK-24938][Core] Prevent Netty from using onhe...
GitHub user NiharS opened a pull request: https://github.com/apache/spark/pull/22114 [SPARK-24938][Core] Prevent Netty from using onheap memory for headers without regard for configuration â¦ffer type instead of immediately opening a pool of onheap memory for headers ## What changes were proposed in this pull request? In MessageEncoder.java, the header would always be allocated on onheap memory regardless of whether netty was configured to use/prefer onheap or offheap. By default this made netty allocate 16mb of onheap memory for a tiny header message. It would be more practical to use preallocated buffers. Using a memory monitor tool on a simple spark application, the following services currently allocate 16 mb of onheap memory: netty-rpc-client netty-blockTransfer-client netty-external-shuffle-client With this change, the memory monitor tool reports all three of these services as using 0 b of onheap memory. The offheap memory allocation does not increase, but more of the already-allocated space is used. ## How was this patch tested? Manually tested change using spark-memory-tool https://github.com/squito/spark-memory You can merge this pull request into a Git repository by running: $ git pull https://github.com/NiharS/spark nettybuffer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22114.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22114 commit c2f9ed10776842ffe0746fcc89b157675fa6c455 Author: Nihar Sheth Date: 2018-08-14T22:49:41Z netty defaults to using current buffers specified by the preferred buffer type instead of immediately opening a pool of onheap memory for headers --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21885: [SPARK-24926][CORE] Ensure numCores is used consistently...
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/21885 Chatted with @squito about this. From what I understood from that discussion, ExternalShuffleService shouldn't be controlled by configurations passed into a spark application as it is its own independent service that is separate from the application. The only other usage that immediately stands out is IndexShuffleBlockResolver, but that only uses the TransportConf for other conf values, never asking for the core count. Past that, it's used in a Mesos Backend class, but that seems outside the scope, and in some testing files, which are safe to leave as is since they don't do anything with the core count. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21885: [SPARK-24926][CORE] Ensure numCores is used consistently...
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/21885 Thanks for the review and feedback! I made the changes, except for the moving the if clause to the same line as "yarn", unfortunately that does make the line 104 characters long. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21885: [SPARK-24926] Ensure numCores is used consistentl...
GitHub user NiharS opened a pull request: https://github.com/apache/spark/pull/21885 [SPARK-24926] Ensure numCores is used consistently in all netty configurations ## What changes were proposed in this pull request? Netty could just ignore user-provided configurations. In particular, spark.driver.cores would be ignored when considering the number of cores available to netty (which would usually just default to Runtime.availableProcessors() ). In transport configurations, the number of threads are based directly on how many cores the system believes it has available, and in yarn cluster mode this would generally overshoot the user-preferred value. ## How was this patch tested? As this is mostly a configuration change, tests were done manually by adding spark-submit confs and verifying the number of threads started by netty was what was expected. Passes scalastyle checks from dev/run-tests Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NiharS/spark usableCores Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21885.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21885 commit 6967dc6cbf064cb3ee046706ef09605b64ddb584 Author: Nihar Sheth Date: 2018-07-26T17:20:52Z Properly plumb numUsableCores from spark.driver.cores --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21807: [SPARK-24536] Validate that limit clause cannot have a n...
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/21807 Hey @mauropalsgraaf just wanted to check in on this. Have you run into any additional issues or have any questions for this fix? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21807: [SPARK-24536] Validate that limit clause cannot have a n...
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/21807 New to SQL, but it seems like the query `SELECT 1 LIMIT CAST('1' AS INT)` should work, right? I tried both on Spark without to your change and the W3Schools SQL tester and it's accepted in both, but I tried on your PR and it hits the new AnalysisException. If this is indeed an issue, it can be avoided by having the case be `e.eval() == null` instead of `e.nullable`, although there's some duplicated work since the previous case also calls `e.eval()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21765: [MINOR][CORE] Add test cases for RDD.cartesian
Github user NiharS commented on the issue: https://github.com/apache/spark/pull/21765 Rebased onto SPARK-24813 so hopefully tests will work now --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21765: [MINOR][CORE] Add test cases for RDD.cartesian
GitHub user NiharS opened a pull request: https://github.com/apache/spark/pull/21765 [MINOR][CORE] Add test cases for RDD.cartesian ## What changes were proposed in this pull request? While looking through the codebase, it appeared that the scala code for RDD.cartesian does not have any tests for correctness. This adds a couple basic tests to verify cartesian yields correct values. While the implementation for RDD.cartesian is pretty simple, it always helps to have a few tests! ## How was this patch tested? The new test cases pass, and the scala style tests from running dev/run-tests all pass. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NiharS/spark cartesianTests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21765.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21765 commit 9df4c3b4a71082181aa979c3bddf2c3d99db256e Author: Nihar Sheth Date: 2018-07-13T20:37:59Z [MINOR][CORE] Add test cases for RDD.cartesian The scala code for RDD.cartesian does not have any tests for correctness. This adds a couple basic tests to verify cartesian yields correct values. Passes the added test cases, and passes the scala style tests. Author: Nihar Sheth --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org