Repository: spark
Updated Branches:
  refs/heads/master cb4844749 -> d12c0711f


[SPARK-3405] add subnet-id and vpc-id options to spark_ec2.py

Based on this gist:
https://gist.github.com/amar-analytx/0b62543621e1f246c0a2

We use security group ids instead of security group to get around this issue:
https://github.com/boto/boto/issues/350

Author: Mike Jennings <mvj...@gmail.com>
Author: Mike Jennings <m...@google.com>

Closes #2872 from mvj101/SPARK-3405 and squashes the following commits:

be9cb43 [Mike Jennings] `pep8 spark_ec2.py` runs cleanly.
4dc6756 [Mike Jennings] Remove duplicate comment
731d94c [Mike Jennings] Update for code review.
ad90a36 [Mike Jennings] Merge branch 'master' of 
https://github.com/apache/spark into SPARK-3405
1ebffa1 [Mike Jennings] Merge branch 'master' into SPARK-3405
52aaeec [Mike Jennings] [SPARK-3405] add subnet-id and vpc-id options to 
spark_ec2.py


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d12c0711
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d12c0711
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d12c0711

Branch: refs/heads/master
Commit: d12c0711faa3d4333513fcbbbee4868bcb784a26
Parents: cb48447
Author: Mike Jennings <mvj...@gmail.com>
Authored: Tue Dec 16 12:13:21 2014 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Tue Dec 16 12:13:21 2014 -0800

----------------------------------------------------------------------
 docs/ec2-scripts.md | 19 ++++++++++++++
 ec2/spark_ec2.py    | 66 +++++++++++++++++++++++++++++++++++++-----------
 2 files changed, 70 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d12c0711/docs/ec2-scripts.md
----------------------------------------------------------------------
diff --git a/docs/ec2-scripts.md b/docs/ec2-scripts.md
index ed51d0a..d50f445 100644
--- a/docs/ec2-scripts.md
+++ b/docs/ec2-scripts.md
@@ -94,6 +94,25 @@ another.
 permissions on your private key file, you can run `launch` with the
 `--resume` option to restart the setup process on an existing cluster.
 
+# Launching a Cluster in a VPC
+
+-   Run
+    `./spark-ec2 -k <keypair> -i <key-file> -s <num-slaves> --vpc-id=<vpc-id> 
--subnet-id=<subnet-id> launch <cluster-name>`,
+    where `<keypair>` is the name of your EC2 key pair (that you gave it
+    when you created it), `<key-file>` is the private key file for your
+    key pair, `<num-slaves>` is the number of slave nodes to launch (try
+    1 at first), `<vpc-id>` is the name of your VPC, `<subnet-id>` is the
+    name of your subnet, and `<cluster-name>` is the name to give to your
+    cluster.
+
+    For example:
+
+    ```bash
+    export AWS_SECRET_ACCESS_KEY=AaBbCcDdEeFGgHhIiJjKkLlMmNnOoPpQqRrSsTtU
+export AWS_ACCESS_KEY_ID=ABCDEFG1234567890123
+./spark-ec2 --key-pair=awskey --identity-file=awskey.pem --region=us-west-1 
--zone=us-west-1a --vpc-id=vpc-a28d24c7 --subnet-id=subnet-4eb27b39 
--spark-version=1.1.0 launch my-spark-cluster
+    ```
+
 # Running Applications
 
 -   Go into the `ec2` directory in the release of Spark you downloaded.

http://git-wip-us.apache.org/repos/asf/spark/blob/d12c0711/ec2/spark_ec2.py
----------------------------------------------------------------------
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 5f9e484..92adfd2 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -162,6 +162,10 @@ def parse_args():
     parser.add_option(
         "--copy-aws-credentials", action="store_true", default=False,
         help="Add AWS credentials to hadoop configuration to allow Spark to 
access S3")
+    parser.add_option(
+        "--subnet-id", default=None, help="VPC subnet to launch instances in")
+    parser.add_option(
+        "--vpc-id", default=None, help="VPC to launch instances in")
 
     (opts, args) = parser.parse_args()
     if len(args) != 2:
@@ -186,14 +190,14 @@ def parse_args():
 
 
 # Get the EC2 security group of the given name, creating it if it doesn't exist
-def get_or_make_group(conn, name):
+def get_or_make_group(conn, name, vpc_id):
     groups = conn.get_all_security_groups()
     group = [g for g in groups if g.name == name]
     if len(group) > 0:
         return group[0]
     else:
         print "Creating security group " + name
-        return conn.create_security_group(name, "Spark EC2 group")
+        return conn.create_security_group(name, "Spark EC2 group", vpc_id)
 
 
 # Check whether a given EC2 instance object is in a state we consider active,
@@ -303,12 +307,26 @@ def launch_cluster(conn, opts, cluster_name):
             user_data_content = user_data_file.read()
 
     print "Setting up security groups..."
-    master_group = get_or_make_group(conn, cluster_name + "-master")
-    slave_group = get_or_make_group(conn, cluster_name + "-slaves")
+    master_group = get_or_make_group(conn, cluster_name + "-master", 
opts.vpc_id)
+    slave_group = get_or_make_group(conn, cluster_name + "-slaves", 
opts.vpc_id)
     authorized_address = opts.authorized_address
     if master_group.rules == []:  # Group was just now created
-        master_group.authorize(src_group=master_group)
-        master_group.authorize(src_group=slave_group)
+        if opts.vpc_id is None:
+            master_group.authorize(src_group=master_group)
+            master_group.authorize(src_group=slave_group)
+        else:
+            master_group.authorize(ip_protocol='icmp', from_port=-1, 
to_port=-1,
+                                   src_group=master_group)
+            master_group.authorize(ip_protocol='tcp', from_port=0, 
to_port=65535,
+                                   src_group=master_group)
+            master_group.authorize(ip_protocol='udp', from_port=0, 
to_port=65535,
+                                   src_group=master_group)
+            master_group.authorize(ip_protocol='icmp', from_port=-1, 
to_port=-1,
+                                   src_group=slave_group)
+            master_group.authorize(ip_protocol='tcp', from_port=0, 
to_port=65535,
+                                   src_group=slave_group)
+            master_group.authorize(ip_protocol='udp', from_port=0, 
to_port=65535,
+                                   src_group=slave_group)
         master_group.authorize('tcp', 22, 22, authorized_address)
         master_group.authorize('tcp', 8080, 8081, authorized_address)
         master_group.authorize('tcp', 18080, 18080, authorized_address)
@@ -320,8 +338,22 @@ def launch_cluster(conn, opts, cluster_name):
         if opts.ganglia:
             master_group.authorize('tcp', 5080, 5080, authorized_address)
     if slave_group.rules == []:  # Group was just now created
-        slave_group.authorize(src_group=master_group)
-        slave_group.authorize(src_group=slave_group)
+        if opts.vpc_id is None:
+            slave_group.authorize(src_group=master_group)
+            slave_group.authorize(src_group=slave_group)
+        else:
+            slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
+                                  src_group=master_group)
+            slave_group.authorize(ip_protocol='tcp', from_port=0, 
to_port=65535,
+                                  src_group=master_group)
+            slave_group.authorize(ip_protocol='udp', from_port=0, 
to_port=65535,
+                                  src_group=master_group)
+            slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
+                                  src_group=slave_group)
+            slave_group.authorize(ip_protocol='tcp', from_port=0, 
to_port=65535,
+                                  src_group=slave_group)
+            slave_group.authorize(ip_protocol='udp', from_port=0, 
to_port=65535,
+                                  src_group=slave_group)
         slave_group.authorize('tcp', 22, 22, authorized_address)
         slave_group.authorize('tcp', 8080, 8081, authorized_address)
         slave_group.authorize('tcp', 50060, 50060, authorized_address)
@@ -341,11 +373,12 @@ def launch_cluster(conn, opts, cluster_name):
     if opts.ami is None:
         opts.ami = get_spark_ami(opts)
 
-    additional_groups = []
+    # we use group ids to work around https://github.com/boto/boto/issues/350
+    additional_group_ids = []
     if opts.additional_security_group:
-        additional_groups = [sg
-                             for sg in conn.get_all_security_groups()
-                             if opts.additional_security_group in (sg.name, 
sg.id)]
+        additional_group_ids = [sg.id
+                                for sg in conn.get_all_security_groups()
+                                if opts.additional_security_group in (sg.name, 
sg.id)]
     print "Launching instances..."
 
     try:
@@ -392,9 +425,10 @@ def launch_cluster(conn, opts, cluster_name):
                 placement=zone,
                 count=num_slaves_this_zone,
                 key_name=opts.key_pair,
-                security_groups=[slave_group] + additional_groups,
+                security_group_ids=[slave_group.id] + additional_group_ids,
                 instance_type=opts.instance_type,
                 block_device_map=block_map,
+                subnet_id=opts.subnet_id,
                 user_data=user_data_content)
             my_req_ids += [req.id for req in slave_reqs]
             i += 1
@@ -441,12 +475,13 @@ def launch_cluster(conn, opts, cluster_name):
             num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
             if num_slaves_this_zone > 0:
                 slave_res = image.run(key_name=opts.key_pair,
-                                      security_groups=[slave_group] + 
additional_groups,
+                                      security_group_ids=[slave_group.id] + 
additional_group_ids,
                                       instance_type=opts.instance_type,
                                       placement=zone,
                                       min_count=num_slaves_this_zone,
                                       max_count=num_slaves_this_zone,
                                       block_device_map=block_map,
+                                      subnet_id=opts.subnet_id,
                                       user_data=user_data_content)
                 slave_nodes += slave_res.instances
                 print "Launched %d slaves in %s, regid = %s" % 
(num_slaves_this_zone,
@@ -467,12 +502,13 @@ def launch_cluster(conn, opts, cluster_name):
         if opts.zone == 'all':
             opts.zone = random.choice(conn.get_all_zones()).name
         master_res = image.run(key_name=opts.key_pair,
-                               security_groups=[master_group] + 
additional_groups,
+                               security_group_ids=[master_group.id] + 
additional_group_ids,
                                instance_type=master_type,
                                placement=opts.zone,
                                min_count=1,
                                max_count=1,
                                block_device_map=block_map,
+                               subnet_id=opts.subnet_id,
                                user_data=user_data_content)
         master_nodes = master_res.instances
         print "Launched master in %s, regid = %s" % (zone, master_res.id)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to