Repository: incubator-airflow Updated Branches: refs/heads/airbnb_rb1.7.1_4 1d0d8681d -> 6f7ea90ae
use targetPartitionSize as the default partition spec Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b58b5e09 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b58b5e09 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b58b5e09 Branch: refs/heads/airbnb_rb1.7.1_4 Commit: b58b5e09578d8a0df17b4de12fe3b49792e9feda Parents: 1d0d868 Author: Hongbo Zeng <hongbo.z...@airbnb.com> Authored: Sat May 14 17:00:42 2016 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Mon May 23 16:59:52 2016 -0700 ---------------------------------------------------------------------- airflow/hooks/druid_hook.py | 23 ++++++++++++++++------- airflow/operators/hive_to_druid.py | 8 +++++--- 2 files changed, 21 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/hooks/druid_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py index b6cb231..7c80c7c 100644 --- a/airflow/hooks/druid_hook.py +++ b/airflow/hooks/druid_hook.py @@ -10,7 +10,7 @@ from airflow.hooks.base_hook import BaseHook from airflow.exceptions import AirflowException LOAD_CHECK_INTERVAL = 5 - +TARGET_PARTITION_SIZE = 5000000 class AirflowDruidLoadException(AirflowException): pass @@ -52,13 +52,22 @@ class DruidHook(BaseHook): def construct_ingest_query( self, datasource, static_path, ts_dim, columns, metric_spec, - intervals, num_shards, hadoop_dependency_coordinates=None): + intervals, num_shards, target_partition_size, hadoop_dependency_coordinates=None): """ Builds an ingest query for an HDFS TSV load. :param datasource: target datasource in druid :param columns: list of all columns in the TSV, in the right order """ + + # backward compatibilty for num_shards, but target_partition_size is the default setting + # and overwrites the num_shards + if target_partition_size == -1: + if num_shards == -1: + target_partition_size = TARGET_PARTITION_SIZE + else: + num_shards = -1 + metric_names = [ m['fieldName'] for m in metric_spec if m['type'] != 'count'] dimensions = [c for c in columns if c not in metric_names and c != ts_dim] @@ -100,7 +109,7 @@ class DruidHook(BaseHook): }, "partitionsSpec" : { "type" : "hashed", - "targetPartitionSize" : -1, + "targetPartitionSize" : target_partition_size, "numShards" : num_shards, }, }, @@ -121,10 +130,10 @@ class DruidHook(BaseHook): def send_ingest_query( self, datasource, static_path, ts_dim, columns, metric_spec, - intervals, num_shards, hadoop_dependency_coordinates=None): + intervals, num_shards, target_partition_size, hadoop_dependency_coordinates=None): query = self.construct_ingest_query( datasource, static_path, ts_dim, columns, - metric_spec, intervals, num_shards, hadoop_dependency_coordinates) + metric_spec, intervals, num_shards, target_partition_size, hadoop_dependency_coordinates) r = requests.post( self.ingest_post_url, headers=self.header, data=query) logging.info(self.ingest_post_url) @@ -138,7 +147,7 @@ class DruidHook(BaseHook): def load_from_hdfs( self, datasource, static_path, ts_dim, columns, - intervals, num_shards, metric_spec=None, hadoop_dependency_coordinates=None): + intervals, num_shards, target_partition_size, metric_spec=None, hadoop_dependency_coordinates=None): """ load data to druid from hdfs :params ts_dim: The column name to use as a timestamp @@ -146,7 +155,7 @@ class DruidHook(BaseHook): """ task_id = self.send_ingest_query( datasource, static_path, ts_dim, columns, metric_spec, - intervals, num_shards, hadoop_dependency_coordinates) + intervals, num_shards, target_partition_size, hadoop_dependency_coordinates) status_url = self.get_ingest_status_url(task_id) while True: r = requests.get(status_url) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/operators/hive_to_druid.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py index 1346841..420aeed 100644 --- a/airflow/operators/hive_to_druid.py +++ b/airflow/operators/hive_to_druid.py @@ -49,7 +49,8 @@ class HiveToDruidTransfer(BaseOperator): metastore_conn_id='metastore_default', hadoop_dependency_coordinates=None, intervals=None, - num_shards=1, + num_shards=-1, + target_partition_size=-1, *args, **kwargs): super(HiveToDruidTransfer, self).__init__(*args, **kwargs) self.sql = sql @@ -57,6 +58,7 @@ class HiveToDruidTransfer(BaseOperator): self.ts_dim = ts_dim self.intervals = intervals or ['{{ ds }}/{{ tomorrow_ds }}'] self.num_shards = num_shards + self.target_partition_size = target_partition_size self.metric_spec = metric_spec or [{ "name": "count", "type": "count"}] @@ -103,8 +105,8 @@ class HiveToDruidTransfer(BaseOperator): datasource=self.druid_datasource, intervals=self.intervals, static_path=static_path, ts_dim=self.ts_dim, - columns=columns, num_shards=self.num_shards, metric_spec=self.metric_spec, - hadoop_dependency_coordinates=self.hadoop_dependency_coordinates) + columns=columns, num_shards=self.num_shards, target_partition_size=self.target_partition_size, + metric_spec=self.metric_spec, hadoop_dependency_coordinates=self.hadoop_dependency_coordinates) logging.info("Load seems to have succeeded!") logging.info(