Repository: spark
Updated Branches:
  refs/heads/master 334c5bd1a -> 52ed7da12


[SPARK-6193] [EC2] Push group filter up to EC2

When looking for a cluster, spark-ec2 currently pulls down [info for all 
instances](https://github.com/apache/spark/blob/eb48fd6e9d55fb034c00e61374bb9c2a86a82fb8/ec2/spark_ec2.py#L620)
 and filters locally. When working on an AWS account with hundreds of active 
instances, this step alone can take over 10 seconds.

This PR improves how spark-ec2 searches for clusters by pushing the filter up 
to EC2.

Basically, the problem (and solution) look like this:

```python
>>> timeit.timeit('blah = conn.get_all_reservations()', setup='from __main__ 
>>> import conn', number=10)
116.96390509605408
>>> timeit.timeit('blah = 
>>> conn.get_all_reservations(filters={"instance.group-name": 
>>> ["my-cluster-master"]})', setup='from __main__ import conn', number=10)
4.629754066467285
```

Translated to a user-visible action, this looks like (against an AWS account 
with ~200 active instances):

```shell
# master
$ python -m timeit -n 3 --setup 'import subprocess' 
'subprocess.call("./spark-ec2 get-master my-cluster --region us-west-2", 
shell=True)'
...
3 loops, best of 3: 9.83 sec per loop

# this PR
$ python -m timeit -n 3 --setup 'import subprocess' 
'subprocess.call("./spark-ec2 get-master my-cluster --region us-west-2", 
shell=True)'
...
3 loops, best of 3: 1.47 sec per loop
```

This PR also refactors `get_existing_cluster()` to make it, I hope, simpler.

Finally, this PR fixes some minor grammar issues related to printing status to 
the user. :tophat: :clap:

Author: Nicholas Chammas <nicholas.cham...@gmail.com>

Closes #4922 from nchammas/get-existing-cluster-faster and squashes the 
following commits:

18802f1 [Nicholas Chammas] ignore shutting-down
f2a5b9f [Nicholas Chammas] fix grammar
d96a489 [Nicholas Chammas] push group filter up to EC2


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52ed7da1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52ed7da1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52ed7da1

Branch: refs/heads/master
Commit: 52ed7da12e26c45734ce53a1be19ef148b2b953e
Parents: 334c5bd
Author: Nicholas Chammas <nicholas.cham...@gmail.com>
Authored: Sun Mar 8 14:01:26 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Mar 8 14:01:26 2015 +0000

----------------------------------------------------------------------
 ec2/spark_ec2.py | 78 +++++++++++++++++++++++++++------------------------
 1 file changed, 41 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/52ed7da1/ec2/spark_ec2.py
----------------------------------------------------------------------
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index b6e7c4c..5e636dd 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -22,6 +22,7 @@
 from __future__ import with_statement
 
 import hashlib
+import itertools
 import logging
 import os
 import os.path
@@ -299,13 +300,6 @@ def get_validate_spark_version(version, repo):
         return version
 
 
-# Check whether a given EC2 instance object is in a state we consider active,
-# i.e. not terminating or terminated. We count both stopping and stopped as
-# active since we can restart stopped clusters.
-def is_active(instance):
-    return (instance.state in ['pending', 'running', 'stopping', 'stopped'])
-
-
 # Source: http://aws.amazon.com/amazon-linux-ami/instance-type-matrix/
 # Last Updated: 2014-06-20
 # For easy maintainability, please keep this manually-inputted dictionary 
sorted by key.
@@ -573,8 +567,11 @@ def launch_cluster(conn, opts, cluster_name):
                                       placement_group=opts.placement_group,
                                       user_data=user_data_content)
                 slave_nodes += slave_res.instances
-                print "Launched %d slaves in %s, regid = %s" % 
(num_slaves_this_zone,
-                                                                zone, 
slave_res.id)
+                print "Launched {s} slave{plural_s} in {z}, regid = 
{r}".format(
+                    s=num_slaves_this_zone,
+                    plural_s=('' if num_slaves_this_zone == 1 else 's'),
+                    z=zone,
+                    r=slave_res.id)
             i += 1
 
     # Launch or resume masters
@@ -621,40 +618,47 @@ def launch_cluster(conn, opts, cluster_name):
     return (master_nodes, slave_nodes)
 
 
-# Get the EC2 instances in an existing cluster if available.
-# Returns a tuple of lists of EC2 instance objects for the masters and slaves
 def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
-    print "Searching for existing cluster " + cluster_name + " in region " \
-        + opts.region + "..."
-    reservations = conn.get_all_reservations()
-    master_nodes = []
-    slave_nodes = []
-    for res in reservations:
-        active = [i for i in res.instances if is_active(i)]
-        for inst in active:
-            group_names = [g.name for g in inst.groups]
-            if (cluster_name + "-master") in group_names:
-                master_nodes.append(inst)
-            elif (cluster_name + "-slaves") in group_names:
-                slave_nodes.append(inst)
-    if any((master_nodes, slave_nodes)):
-        print "Found %d master(s), %d slaves" % (len(master_nodes), 
len(slave_nodes))
-    if master_nodes != [] or not die_on_error:
-        return (master_nodes, slave_nodes)
-    else:
-        if master_nodes == [] and slave_nodes != []:
-            print >> sys.stderr, "ERROR: Could not find master in group " + 
cluster_name \
-                + "-master" + " in region " + opts.region
-        else:
-            print >> sys.stderr, "ERROR: Could not find any existing cluster" \
-                + " in region " + opts.region
+    """
+    Get the EC2 instances in an existing cluster if available.
+    Returns a tuple of lists of EC2 instance objects for the masters and 
slaves.
+    """
+    print "Searching for existing cluster {c} in region {r}...".format(
+        c=cluster_name, r=opts.region)
+
+    def get_instances(group_names):
+        """
+        Get all non-terminated instances that belong to any of the provided 
security groups.
+
+        EC2 reservation filters and instance states are documented here:
+            
http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html#options
+        """
+        reservations = conn.get_all_reservations(
+            filters={"instance.group-name": group_names})
+        instances = itertools.chain.from_iterable(r.instances for r in 
reservations)
+        return [i for i in instances if i.state not in ["shutting-down", 
"terminated"]]
+
+    master_instances = get_instances([cluster_name + "-master"])
+    slave_instances = get_instances([cluster_name + "-slaves"])
+
+    if any((master_instances, slave_instances)):
+        print "Found {m} master{plural_m}, {s} slave{plural_s}.".format(
+            m=len(master_instances),
+            plural_m=('' if len(master_instances) == 1 else 's'),
+            s=len(slave_instances),
+            plural_s=('' if len(slave_instances) == 1 else 's'))
+
+    if not master_instances and die_on_error:
+        print >> sys.stderr, \
+            "ERROR: Could not find a master for cluster {c} in region 
{r}.".format(
+                c=cluster_name, r=opts.region)
         sys.exit(1)
 
+    return (master_instances, slave_instances)
+
 
 # Deploy configuration files and run setup scripts on a newly launched
 # or started EC2 cluster.
-
-
 def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
     master = master_nodes[0].public_dns_name
     if deploy_ssh_key:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to