[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] Initial Yandex.Cloud Dataproc support

2020-02-14 Thread GitBox
nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] 
Initial Yandex.Cloud Dataproc support
URL: https://github.com/apache/airflow/pull/7252#discussion_r379456748
 
 

 ##
 File path: airflow/providers/yandex/operators/yandexcloud_dataproc_operator.py
 ##
 @@ -0,0 +1,334 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from airflow.providers.yandex.hooks.yandexcloud_dataproc_hook import 
DataprocHook
+from airflow.providers.yandex.operators import YandexCloudBaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataprocCreateClusterOperator(YandexCloudBaseOperator):
+"""Creates Yandex.Cloud Data Proc cluster."""
+
+# pylint: disable=too-many-instance-attributes
+# pylint: disable=too-many-arguments
+# pylint: disable=too-many-locals
+
+@apply_defaults
+def __init__(self,
+ folder_id=None,
+ connection_id=None,
+ cluster_name=None,
+ cluster_description='',
+ cluster_image_version='1.1',
+ ssh_public_keys=None,
+ subnet_id=None,
+ services=('HDFS', 'YARN', 'MAPREDUCE', 'HIVE', 'SPARK'),
+ s3_bucket=None,
+ zone='ru-central1-b',
+ service_account_id=None,
+ masternode_resource_preset='s2.small',
+ masternode_disk_size=15,
+ masternode_disk_type='network-ssd',
+ datanode_resource_preset='s2.small',
+ datanode_disk_size=15,
+ datanode_disk_type='network-ssd',
+ datanode_count=2,
+ computenode_resource_preset='s2.small',
+ computenode_disk_size=15,
+ computenode_disk_type='network-ssd',
+ computenode_count=0,
+ *arguments,
+ **kwargs):
+super().__init__(*arguments, **kwargs)
+self.folder_id = folder_id
+self.connection_id = connection_id
+self.cluster_name = cluster_name
+self.cluster_description = cluster_description
+self.cluster_image_version = cluster_image_version
+self.ssh_public_keys = ssh_public_keys
+self.subnet_id = subnet_id
+self.services = services
+self.s3_bucket = s3_bucket
+self.zone = zone
+self.service_account_id = service_account_id
+self.masternode_resource_preset = masternode_resource_preset
+self.masternode_disk_size = masternode_disk_size
+self.masternode_disk_type = masternode_disk_type
+self.datanode_resource_preset = datanode_resource_preset
+self.datanode_disk_size = datanode_disk_size
+self.datanode_disk_type = datanode_disk_type
+self.datanode_count = datanode_count
+self.computenode_resource_preset = computenode_resource_preset
+self.computenode_disk_size = computenode_disk_size
+self.computenode_disk_type = computenode_disk_type
+self.computenode_count = computenode_count
+
+def execute(self, context):
+self.hook = DataprocHook(
+connection_id=self.connection_id,
+)
+operation_result = self.hook.client.create_cluster(
+folder_id=self.folder_id,
+cluster_name=self.cluster_name,
+cluster_description=self.cluster_description,
+cluster_image_version=self.cluster_image_version,
+ssh_public_keys=self.ssh_public_keys,
+subnet_id=self.subnet_id,
+services=self.services,
+s3_bucket=self.s3_bucket,
+zone=self.zone,
+service_account_id=self.service_account_id,
+masternode_resource_preset=self.masternode_resource_preset,
+masternode_disk_size=self.masternode_disk_size,
+masternode_disk_type=self.masternode_disk_type,
+datanode_resource_preset=self.datanode_resource_preset,
+datanode_disk_size=self.datanode_disk_size,
+datanode_disk_type=self.datanode_disk_type,
+datanode_count=self.datanode_count,
+

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] Initial Yandex.Cloud Dataproc support

2020-02-14 Thread GitBox
nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] 
Initial Yandex.Cloud Dataproc support
URL: https://github.com/apache/airflow/pull/7252#discussion_r379455746
 
 

 ##
 File path: airflow/providers/yandex/operators/yandexcloud_dataproc_operator.py
 ##
 @@ -0,0 +1,334 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from airflow.providers.yandex.hooks.yandexcloud_dataproc_hook import 
DataprocHook
+from airflow.providers.yandex.operators import YandexCloudBaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataprocCreateClusterOperator(YandexCloudBaseOperator):
+"""Creates Yandex.Cloud Data Proc cluster."""
+
+# pylint: disable=too-many-instance-attributes
+# pylint: disable=too-many-arguments
+# pylint: disable=too-many-locals
+
+@apply_defaults
+def __init__(self,
+ folder_id=None,
+ connection_id=None,
+ cluster_name=None,
+ cluster_description='',
+ cluster_image_version='1.1',
+ ssh_public_keys=None,
+ subnet_id=None,
+ services=('HDFS', 'YARN', 'MAPREDUCE', 'HIVE', 'SPARK'),
+ s3_bucket=None,
+ zone='ru-central1-b',
+ service_account_id=None,
+ masternode_resource_preset='s2.small',
+ masternode_disk_size=15,
+ masternode_disk_type='network-ssd',
+ datanode_resource_preset='s2.small',
+ datanode_disk_size=15,
+ datanode_disk_type='network-ssd',
+ datanode_count=2,
+ computenode_resource_preset='s2.small',
+ computenode_disk_size=15,
+ computenode_disk_type='network-ssd',
+ computenode_count=0,
+ *arguments,
+ **kwargs):
+super().__init__(*arguments, **kwargs)
+self.folder_id = folder_id
+self.connection_id = connection_id
+self.cluster_name = cluster_name
+self.cluster_description = cluster_description
+self.cluster_image_version = cluster_image_version
+self.ssh_public_keys = ssh_public_keys
+self.subnet_id = subnet_id
+self.services = services
+self.s3_bucket = s3_bucket
+self.zone = zone
+self.service_account_id = service_account_id
+self.masternode_resource_preset = masternode_resource_preset
+self.masternode_disk_size = masternode_disk_size
+self.masternode_disk_type = masternode_disk_type
+self.datanode_resource_preset = datanode_resource_preset
+self.datanode_disk_size = datanode_disk_size
+self.datanode_disk_type = datanode_disk_type
+self.datanode_count = datanode_count
+self.computenode_resource_preset = computenode_resource_preset
+self.computenode_disk_size = computenode_disk_size
+self.computenode_disk_type = computenode_disk_type
+self.computenode_count = computenode_count
+
+def execute(self, context):
+self.hook = DataprocHook(
+connection_id=self.connection_id,
+)
+operation_result = self.hook.client.create_cluster(
+folder_id=self.folder_id,
+cluster_name=self.cluster_name,
+cluster_description=self.cluster_description,
+cluster_image_version=self.cluster_image_version,
+ssh_public_keys=self.ssh_public_keys,
+subnet_id=self.subnet_id,
+services=self.services,
+s3_bucket=self.s3_bucket,
+zone=self.zone,
+service_account_id=self.service_account_id,
+masternode_resource_preset=self.masternode_resource_preset,
+masternode_disk_size=self.masternode_disk_size,
+masternode_disk_type=self.masternode_disk_type,
+datanode_resource_preset=self.datanode_resource_preset,
+datanode_disk_size=self.datanode_disk_size,
+datanode_disk_type=self.datanode_disk_type,
+datanode_count=self.datanode_count,
+

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] Initial Yandex.Cloud Dataproc support

2020-02-14 Thread GitBox
nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] 
Initial Yandex.Cloud Dataproc support
URL: https://github.com/apache/airflow/pull/7252#discussion_r379455746
 
 

 ##
 File path: airflow/providers/yandex/operators/yandexcloud_dataproc_operator.py
 ##
 @@ -0,0 +1,334 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from airflow.providers.yandex.hooks.yandexcloud_dataproc_hook import 
DataprocHook
+from airflow.providers.yandex.operators import YandexCloudBaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataprocCreateClusterOperator(YandexCloudBaseOperator):
+"""Creates Yandex.Cloud Data Proc cluster."""
+
+# pylint: disable=too-many-instance-attributes
+# pylint: disable=too-many-arguments
+# pylint: disable=too-many-locals
+
+@apply_defaults
+def __init__(self,
+ folder_id=None,
+ connection_id=None,
+ cluster_name=None,
+ cluster_description='',
+ cluster_image_version='1.1',
+ ssh_public_keys=None,
+ subnet_id=None,
+ services=('HDFS', 'YARN', 'MAPREDUCE', 'HIVE', 'SPARK'),
+ s3_bucket=None,
+ zone='ru-central1-b',
+ service_account_id=None,
+ masternode_resource_preset='s2.small',
+ masternode_disk_size=15,
+ masternode_disk_type='network-ssd',
+ datanode_resource_preset='s2.small',
+ datanode_disk_size=15,
+ datanode_disk_type='network-ssd',
+ datanode_count=2,
+ computenode_resource_preset='s2.small',
+ computenode_disk_size=15,
+ computenode_disk_type='network-ssd',
+ computenode_count=0,
+ *arguments,
+ **kwargs):
+super().__init__(*arguments, **kwargs)
+self.folder_id = folder_id
+self.connection_id = connection_id
+self.cluster_name = cluster_name
+self.cluster_description = cluster_description
+self.cluster_image_version = cluster_image_version
+self.ssh_public_keys = ssh_public_keys
+self.subnet_id = subnet_id
+self.services = services
+self.s3_bucket = s3_bucket
+self.zone = zone
+self.service_account_id = service_account_id
+self.masternode_resource_preset = masternode_resource_preset
+self.masternode_disk_size = masternode_disk_size
+self.masternode_disk_type = masternode_disk_type
+self.datanode_resource_preset = datanode_resource_preset
+self.datanode_disk_size = datanode_disk_size
+self.datanode_disk_type = datanode_disk_type
+self.datanode_count = datanode_count
+self.computenode_resource_preset = computenode_resource_preset
+self.computenode_disk_size = computenode_disk_size
+self.computenode_disk_type = computenode_disk_type
+self.computenode_count = computenode_count
+
+def execute(self, context):
+self.hook = DataprocHook(
+connection_id=self.connection_id,
+)
+operation_result = self.hook.client.create_cluster(
+folder_id=self.folder_id,
+cluster_name=self.cluster_name,
+cluster_description=self.cluster_description,
+cluster_image_version=self.cluster_image_version,
+ssh_public_keys=self.ssh_public_keys,
+subnet_id=self.subnet_id,
+services=self.services,
+s3_bucket=self.s3_bucket,
+zone=self.zone,
+service_account_id=self.service_account_id,
+masternode_resource_preset=self.masternode_resource_preset,
+masternode_disk_size=self.masternode_disk_size,
+masternode_disk_type=self.masternode_disk_type,
+datanode_resource_preset=self.datanode_resource_preset,
+datanode_disk_size=self.datanode_disk_size,
+datanode_disk_type=self.datanode_disk_type,
+datanode_count=self.datanode_count,
+

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] Initial Yandex.Cloud Dataproc support

2020-02-14 Thread GitBox
nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] 
Initial Yandex.Cloud Dataproc support
URL: https://github.com/apache/airflow/pull/7252#discussion_r379454451
 
 

 ##
 File path: airflow/providers/yandex/operators/yandexcloud_dataproc_operator.py
 ##
 @@ -0,0 +1,334 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from airflow.providers.yandex.hooks.yandexcloud_dataproc_hook import 
DataprocHook
+from airflow.providers.yandex.operators import YandexCloudBaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataprocCreateClusterOperator(YandexCloudBaseOperator):
 
 Review comment:
   Is there any reason for this inheritance? `YandexCloudBaseOperator` does not 
implement any common methods. In my opinion, adding 3 lines in all operators is 
a small cost for better encapsulation :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] Initial Yandex.Cloud Dataproc support

2020-01-28 Thread GitBox
nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] 
Initial Yandex.Cloud Dataproc support
URL: https://github.com/apache/airflow/pull/7252#discussion_r371366880
 
 

 ##
 File path: airflow/contrib/hooks/yandexcloud_dataproc_hook.py
 ##
 @@ -0,0 +1,571 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import random
+from datetime import datetime
+
+import yandex.cloud.dataproc.v1.cluster_pb2 as cluster_pb
+import yandex.cloud.dataproc.v1.cluster_service_pb2 as cluster_service_pb
+import yandex.cloud.dataproc.v1.cluster_service_pb2_grpc as 
cluster_service_grpc_pb
+import yandex.cloud.dataproc.v1.common_pb2 as common_pb
+import yandex.cloud.dataproc.v1.job_pb2 as job_pb
+import yandex.cloud.dataproc.v1.job_service_pb2 as job_service_pb
+import yandex.cloud.dataproc.v1.job_service_pb2_grpc as job_service_grpc_pb
+import yandex.cloud.dataproc.v1.subcluster_pb2 as subcluster_pb
+import yandex.cloud.dataproc.v1.subcluster_service_pb2 as subcluster_service_pb
+import yandex.cloud.dataproc.v1.subcluster_service_pb2_grpc as 
subcluster_service_grpc_pb
+from google.protobuf.field_mask_pb2 import FieldMask
+from six import string_types
+
+from airflow.contrib.hooks.yandexcloud_base_hook import YandexCloudBaseHook
+from airflow.exceptions import AirflowException
+
+
+class DataprocHook(YandexCloudBaseHook):
+"""
+A base hook for Yandex.Cloud Data Proc.
+
+:param connection_id: The connection ID to use when fetching connection 
info.
+:type connection_id: str
+"""
+
+def __init__(self, *args, **kwargs):
+super(DataprocHook, self).__init__(*args, **kwargs)
+self.cluster_id = None
+
+def _get_operation_result(self, operation, response_type=None, 
meta_type=None):
+message = 'Running Yandex.Cloud operation. ID: {}. Description: {}. 
Created at: {}. Created by: {}.'
+message = message.format(
+operation.id,
+operation.description,
+datetime.fromtimestamp(operation.created_at.seconds),
+operation.created_by,
+)
+if meta_type:
+unpacked_meta = meta_type()
+operation.metadata.Unpack(unpacked_meta)
+message += ' Meta: {}.'.format(unpacked_meta)
+self.log.info(message)
+result = self.wait_for_operation(operation)
+if result.error and result.error.code:
+error_message = 'Error Yandex.Cloud operation. ID: {}. Error code: 
{}. Details: {}. Message: {}.'
+error_message = error_message.format(
+result.id, result.error.code, result.error.details, 
result.error.message
+)
+self.log.error(error_message)
+raise AirflowException(error_message)
+else:
+log_message = 'Done Yandex.Cloud operation. ID: 
{}.'.format(operation.id)
+unpacked_response = None
+if response_type:
+unpacked_response = response_type()
+result.response.Unpack(unpacked_response)
+log_message += ' Response: {}.'.format(unpacked_response)
+self.log.info(log_message)
+if unpacked_response:
+return unpacked_response
+return None
+
+def add_subcluster(
+self,
+cluster_id,
+subcluster_type,
+name,
+subnet_id,
+resource_preset='s2.small',
+disk_size=15,
+disk_type='network-ssd',
+hosts_count=5,
+):
+"""
+Add subcluster to Yandex.Cloud Data Proc cluster.
+
+:param cluster_id: ID of the cluster.
+:type cluster_id: str
+:param name: Name of the subcluster. Must be unique in the cluster
+:type name: str
+:param subcluster_type: Type of the subcluster. Either "data" or 
"compute".
+:type subcluster_type: str
+:param subnet_id: Subnet ID of the cluster.
+:type subnet_id: str
+:param resource_preset: Resources preset (CPU+RAM configuration) for 
the nodes of the cluster.
+:type resource_preset: str
+:param disk_size: Storage size in GiB.
+:type disk_size: int
+:param disk_type: 

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] Initial Yandex.Cloud Dataproc support

2020-01-27 Thread GitBox
nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] 
Initial Yandex.Cloud Dataproc support
URL: https://github.com/apache/airflow/pull/7252#discussion_r371366880
 
 

 ##
 File path: airflow/contrib/hooks/yandexcloud_dataproc_hook.py
 ##
 @@ -0,0 +1,571 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import random
+from datetime import datetime
+
+import yandex.cloud.dataproc.v1.cluster_pb2 as cluster_pb
+import yandex.cloud.dataproc.v1.cluster_service_pb2 as cluster_service_pb
+import yandex.cloud.dataproc.v1.cluster_service_pb2_grpc as 
cluster_service_grpc_pb
+import yandex.cloud.dataproc.v1.common_pb2 as common_pb
+import yandex.cloud.dataproc.v1.job_pb2 as job_pb
+import yandex.cloud.dataproc.v1.job_service_pb2 as job_service_pb
+import yandex.cloud.dataproc.v1.job_service_pb2_grpc as job_service_grpc_pb
+import yandex.cloud.dataproc.v1.subcluster_pb2 as subcluster_pb
+import yandex.cloud.dataproc.v1.subcluster_service_pb2 as subcluster_service_pb
+import yandex.cloud.dataproc.v1.subcluster_service_pb2_grpc as 
subcluster_service_grpc_pb
+from google.protobuf.field_mask_pb2 import FieldMask
+from six import string_types
+
+from airflow.contrib.hooks.yandexcloud_base_hook import YandexCloudBaseHook
+from airflow.exceptions import AirflowException
+
+
+class DataprocHook(YandexCloudBaseHook):
+"""
+A base hook for Yandex.Cloud Data Proc.
+
+:param connection_id: The connection ID to use when fetching connection 
info.
+:type connection_id: str
+"""
+
+def __init__(self, *args, **kwargs):
+super(DataprocHook, self).__init__(*args, **kwargs)
+self.cluster_id = None
+
+def _get_operation_result(self, operation, response_type=None, 
meta_type=None):
+message = 'Running Yandex.Cloud operation. ID: {}. Description: {}. 
Created at: {}. Created by: {}.'
+message = message.format(
+operation.id,
+operation.description,
+datetime.fromtimestamp(operation.created_at.seconds),
+operation.created_by,
+)
+if meta_type:
+unpacked_meta = meta_type()
+operation.metadata.Unpack(unpacked_meta)
+message += ' Meta: {}.'.format(unpacked_meta)
+self.log.info(message)
+result = self.wait_for_operation(operation)
+if result.error and result.error.code:
+error_message = 'Error Yandex.Cloud operation. ID: {}. Error code: 
{}. Details: {}. Message: {}.'
+error_message = error_message.format(
+result.id, result.error.code, result.error.details, 
result.error.message
+)
+self.log.error(error_message)
+raise AirflowException(error_message)
+else:
+log_message = 'Done Yandex.Cloud operation. ID: 
{}.'.format(operation.id)
+unpacked_response = None
+if response_type:
+unpacked_response = response_type()
+result.response.Unpack(unpacked_response)
+log_message += ' Response: {}.'.format(unpacked_response)
+self.log.info(log_message)
+if unpacked_response:
+return unpacked_response
+return None
+
+def add_subcluster(
+self,
+cluster_id,
+subcluster_type,
+name,
+subnet_id,
+resource_preset='s2.small',
+disk_size=15,
+disk_type='network-ssd',
+hosts_count=5,
+):
+"""
+Add subcluster to Yandex.Cloud Data Proc cluster.
+
+:param cluster_id: ID of the cluster.
+:type cluster_id: str
+:param name: Name of the subcluster. Must be unique in the cluster
+:type name: str
+:param subcluster_type: Type of the subcluster. Either "data" or 
"compute".
+:type subcluster_type: str
+:param subnet_id: Subnet ID of the cluster.
+:type subnet_id: str
+:param resource_preset: Resources preset (CPU+RAM configuration) for 
the nodes of the cluster.
+:type resource_preset: str
+:param disk_size: Storage size in GiB.
+:type disk_size: int
+:param disk_type: 

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] Initial Yandex.Cloud Dataproc support

2020-01-25 Thread GitBox
nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] 
Initial Yandex.Cloud Dataproc support
URL: https://github.com/apache/airflow/pull/7252#discussion_r370923827
 
 

 ##
 File path: airflow/contrib/operators/yandexcloud_base_operator.py
 ##
 @@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from airflow.contrib.hooks.yandexcloud_base_hook import YandexCloudBaseHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class YandexCloudBaseOperator(BaseOperator):
+"""The base class for operators that poll on a Dataproc Operation."""
+@apply_defaults
+def __init__(self,
+ folder_id=None,
+ connection_id='yandexcloud_default',
+ *args,
+ **kwargs):
+super(YandexCloudBaseOperator, self).__init__(*args, **kwargs)
+self.connection_id = connection_id
+self.hook = YandexCloudBaseHook(
 
 Review comment:
   The hook should not be initialized in the constructor. This results in many 
unnecessary queries during DAG parsing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] Initial Yandex.Cloud Dataproc support

2020-01-25 Thread GitBox
nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] 
Initial Yandex.Cloud Dataproc support
URL: https://github.com/apache/airflow/pull/7252#discussion_r370924002
 
 

 ##
 File path: airflow/contrib/hooks/yandexcloud_dataproc_hook.py
 ##
 @@ -0,0 +1,571 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import random
+from datetime import datetime
+
+import yandex.cloud.dataproc.v1.cluster_pb2 as cluster_pb
+import yandex.cloud.dataproc.v1.cluster_service_pb2 as cluster_service_pb
+import yandex.cloud.dataproc.v1.cluster_service_pb2_grpc as 
cluster_service_grpc_pb
+import yandex.cloud.dataproc.v1.common_pb2 as common_pb
+import yandex.cloud.dataproc.v1.job_pb2 as job_pb
+import yandex.cloud.dataproc.v1.job_service_pb2 as job_service_pb
+import yandex.cloud.dataproc.v1.job_service_pb2_grpc as job_service_grpc_pb
+import yandex.cloud.dataproc.v1.subcluster_pb2 as subcluster_pb
+import yandex.cloud.dataproc.v1.subcluster_service_pb2 as subcluster_service_pb
+import yandex.cloud.dataproc.v1.subcluster_service_pb2_grpc as 
subcluster_service_grpc_pb
+from google.protobuf.field_mask_pb2 import FieldMask
+from six import string_types
+
+from airflow.contrib.hooks.yandexcloud_base_hook import YandexCloudBaseHook
+from airflow.exceptions import AirflowException
+
+
+class DataprocHook(YandexCloudBaseHook):
+"""
+A base hook for Yandex.Cloud Data Proc.
+
+:param connection_id: The connection ID to use when fetching connection 
info.
+:type connection_id: str
+"""
+
+def __init__(self, *args, **kwargs):
+super(DataprocHook, self).__init__(*args, **kwargs)
+self.cluster_id = None
+
+def _get_operation_result(self, operation, response_type=None, 
meta_type=None):
+message = 'Running Yandex.Cloud operation. ID: {}. Description: {}. 
Created at: {}. Created by: {}.'
+message = message.format(
+operation.id,
+operation.description,
+datetime.fromtimestamp(operation.created_at.seconds),
+operation.created_by,
+)
+if meta_type:
+unpacked_meta = meta_type()
+operation.metadata.Unpack(unpacked_meta)
+message += ' Meta: {}.'.format(unpacked_meta)
+self.log.info(message)
+result = self.wait_for_operation(operation)
+if result.error and result.error.code:
+error_message = 'Error Yandex.Cloud operation. ID: {}. Error code: 
{}. Details: {}. Message: {}.'
+error_message = error_message.format(
+result.id, result.error.code, result.error.details, 
result.error.message
+)
+self.log.error(error_message)
+raise AirflowException(error_message)
+else:
+log_message = 'Done Yandex.Cloud operation. ID: 
{}.'.format(operation.id)
+unpacked_response = None
+if response_type:
+unpacked_response = response_type()
+result.response.Unpack(unpacked_response)
+log_message += ' Response: {}.'.format(unpacked_response)
+self.log.info(log_message)
+if unpacked_response:
+return unpacked_response
+return None
+
+def add_subcluster(
+self,
+cluster_id,
+subcluster_type,
+name,
+subnet_id,
+resource_preset='s2.small',
+disk_size=15,
+disk_type='network-ssd',
+hosts_count=5,
+):
+"""
+Add subcluster to Yandex.Cloud Data Proc cluster.
+
+:param cluster_id: ID of the cluster.
+:type cluster_id: str
+:param name: Name of the subcluster. Must be unique in the cluster
+:type name: str
+:param subcluster_type: Type of the subcluster. Either "data" or 
"compute".
+:type subcluster_type: str
+:param subnet_id: Subnet ID of the cluster.
+:type subnet_id: str
+:param resource_preset: Resources preset (CPU+RAM configuration) for 
the nodes of the cluster.
+:type resource_preset: str
+:param disk_size: Storage size in GiB.
+:type disk_size: int
+:param disk_type: 

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] Initial Yandex.Cloud Dataproc support

2020-01-25 Thread GitBox
nuclearpinguin commented on a change in pull request #7252: [AIRFLOW-6531] 
Initial Yandex.Cloud Dataproc support
URL: https://github.com/apache/airflow/pull/7252#discussion_r370923823
 
 

 ##
 File path: airflow/contrib/operators/yandexcloud_base_operator.py
 ##
 @@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from airflow.contrib.hooks.yandexcloud_base_hook import YandexCloudBaseHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class YandexCloudBaseOperator(BaseOperator):
+"""The base class for operators that poll on a Dataproc Operation."""
+@apply_defaults
+def __init__(self,
+ folder_id=None,
+ connection_id='yandexcloud_default',
+ *args,
+ **kwargs):
+super(YandexCloudBaseOperator, self).__init__(*args, **kwargs)
 
 Review comment:
   ```suggestion
   super().__init__(*args, **kwargs)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services