Repository: incubator-airflow
Updated Branches:
  refs/heads/airbnb_rb1.7.1_4 1d0d8681d -> 6f7ea90ae


use targetPartitionSize as the default partition spec


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b58b5e09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b58b5e09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b58b5e09

Branch: refs/heads/airbnb_rb1.7.1_4
Commit: b58b5e09578d8a0df17b4de12fe3b49792e9feda
Parents: 1d0d868
Author: Hongbo Zeng <hongbo.z...@airbnb.com>
Authored: Sat May 14 17:00:42 2016 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Mon May 23 16:59:52 2016 -0700

----------------------------------------------------------------------
 airflow/hooks/druid_hook.py        | 23 ++++++++++++++++-------
 airflow/operators/hive_to_druid.py |  8 +++++---
 2 files changed, 21 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/hooks/druid_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
index b6cb231..7c80c7c 100644
--- a/airflow/hooks/druid_hook.py
+++ b/airflow/hooks/druid_hook.py
@@ -10,7 +10,7 @@ from airflow.hooks.base_hook import BaseHook
 from airflow.exceptions import AirflowException
 
 LOAD_CHECK_INTERVAL = 5
-
+TARGET_PARTITION_SIZE = 5000000
 
 class AirflowDruidLoadException(AirflowException):
     pass
@@ -52,13 +52,22 @@ class DruidHook(BaseHook):
 
     def construct_ingest_query(
             self, datasource, static_path, ts_dim, columns, metric_spec,
-            intervals, num_shards, hadoop_dependency_coordinates=None):
+            intervals, num_shards, target_partition_size, 
hadoop_dependency_coordinates=None):
         """
         Builds an ingest query for an HDFS TSV load.
 
         :param datasource: target datasource in druid
         :param columns: list of all columns in the TSV, in the right order
         """
+
+        # backward compatibilty for num_shards, but target_partition_size is 
the default setting
+        # and overwrites the num_shards
+        if target_partition_size == -1:
+            if num_shards == -1:
+                target_partition_size = TARGET_PARTITION_SIZE
+        else:
+            num_shards = -1
+
         metric_names = [
             m['fieldName'] for m in metric_spec if m['type'] != 'count']
         dimensions = [c for c in columns if c not in metric_names and c != 
ts_dim]
@@ -100,7 +109,7 @@ class DruidHook(BaseHook):
                     },
                     "partitionsSpec" : {
                         "type" : "hashed",
-                        "targetPartitionSize" : -1,
+                        "targetPartitionSize" : target_partition_size,
                         "numShards" : num_shards,
                     },
                 },
@@ -121,10 +130,10 @@ class DruidHook(BaseHook):
 
     def send_ingest_query(
             self, datasource, static_path, ts_dim, columns, metric_spec,
-            intervals, num_shards, hadoop_dependency_coordinates=None):
+            intervals, num_shards, target_partition_size, 
hadoop_dependency_coordinates=None):
         query = self.construct_ingest_query(
             datasource, static_path, ts_dim, columns,
-            metric_spec, intervals, num_shards, hadoop_dependency_coordinates)
+            metric_spec, intervals, num_shards, target_partition_size, 
hadoop_dependency_coordinates)
         r = requests.post(
             self.ingest_post_url, headers=self.header, data=query)
         logging.info(self.ingest_post_url)
@@ -138,7 +147,7 @@ class DruidHook(BaseHook):
 
     def load_from_hdfs(
             self, datasource, static_path,  ts_dim, columns,
-            intervals, num_shards, metric_spec=None, 
hadoop_dependency_coordinates=None):
+            intervals, num_shards, target_partition_size, metric_spec=None, 
hadoop_dependency_coordinates=None):
         """
         load data to druid from hdfs
         :params ts_dim: The column name to use as a timestamp
@@ -146,7 +155,7 @@ class DruidHook(BaseHook):
         """
         task_id = self.send_ingest_query(
             datasource, static_path, ts_dim, columns, metric_spec,
-            intervals, num_shards, hadoop_dependency_coordinates)
+            intervals, num_shards, target_partition_size, 
hadoop_dependency_coordinates)
         status_url = self.get_ingest_status_url(task_id)
         while True:
             r = requests.get(status_url)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/operators/hive_to_druid.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_druid.py 
b/airflow/operators/hive_to_druid.py
index 1346841..420aeed 100644
--- a/airflow/operators/hive_to_druid.py
+++ b/airflow/operators/hive_to_druid.py
@@ -49,7 +49,8 @@ class HiveToDruidTransfer(BaseOperator):
             metastore_conn_id='metastore_default',
             hadoop_dependency_coordinates=None,
             intervals=None,
-            num_shards=1,
+            num_shards=-1,
+            target_partition_size=-1,
             *args, **kwargs):
         super(HiveToDruidTransfer, self).__init__(*args, **kwargs)
         self.sql = sql
@@ -57,6 +58,7 @@ class HiveToDruidTransfer(BaseOperator):
         self.ts_dim = ts_dim
         self.intervals = intervals or ['{{ ds }}/{{ tomorrow_ds }}']
         self.num_shards = num_shards
+        self.target_partition_size = target_partition_size
         self.metric_spec = metric_spec or [{
             "name": "count",
             "type": "count"}]
@@ -103,8 +105,8 @@ class HiveToDruidTransfer(BaseOperator):
             datasource=self.druid_datasource,
             intervals=self.intervals,
             static_path=static_path, ts_dim=self.ts_dim,
-            columns=columns, num_shards=self.num_shards, 
metric_spec=self.metric_spec,
-            hadoop_dependency_coordinates=self.hadoop_dependency_coordinates)
+            columns=columns, num_shards=self.num_shards, 
target_partition_size=self.target_partition_size,
+            metric_spec=self.metric_spec, 
hadoop_dependency_coordinates=self.hadoop_dependency_coordinates)
         logging.info("Load seems to have succeeded!")
 
         logging.info(

Reply via email to