Ah, yea. I get bitten by that two. It's annoying to have to ask people to add a JIRA to their commit message. And we can't squash through GitHub anymore. :( Wonder if the airflow-pr script allows us to edit it? I think it might....
On Mon, May 23, 2016 at 5:50 PM, Dan Davydov <dan.davy...@airbnb.com.invalid > wrote: > Yep sorry will check the versions in the future. My own commits have JIRA > labels but I haven't validated that other users have done this for theirs > when I merge their commits (as the LGTM is delegated to either another > committer or the owner of a particular operator). Will be more vigilant in > the future. > > On Mon, May 23, 2016 at 5:07 PM, Chris Riccomini <criccom...@apache.org> > wrote: > > > Hey Dan, > > > > Could you please file JIRAs, and put the JIRA name as the prefix to your > > commits? > > > > Cheers, > > Chris > > > > On Mon, May 23, 2016 at 5:01 PM, <davy...@apache.org> wrote: > > > >> Repository: incubator-airflow > >> Updated Branches: > >> refs/heads/airbnb_rb1.7.1_4 1d0d8681d -> 6f7ea90ae > >> > >> > >> use targetPartitionSize as the default partition spec > >> > >> > >> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo > >> Commit: > >> > http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b58b5e09 > >> Tree: > >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b58b5e09 > >> Diff: > >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b58b5e09 > >> > >> Branch: refs/heads/airbnb_rb1.7.1_4 > >> Commit: b58b5e09578d8a0df17b4de12fe3b49792e9feda > >> Parents: 1d0d868 > >> Author: Hongbo Zeng <hongbo.z...@airbnb.com> > >> Authored: Sat May 14 17:00:42 2016 -0700 > >> Committer: Dan Davydov <dan.davy...@airbnb.com> > >> Committed: Mon May 23 16:59:52 2016 -0700 > >> > >> ---------------------------------------------------------------------- > >> airflow/hooks/druid_hook.py | 23 ++++++++++++++++------- > >> airflow/operators/hive_to_druid.py | 8 +++++--- > >> 2 files changed, 21 insertions(+), 10 deletions(-) > >> ---------------------------------------------------------------------- > >> > >> > >> > >> > http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/hooks/druid_hook.py > >> ---------------------------------------------------------------------- > >> diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py > >> index b6cb231..7c80c7c 100644 > >> --- a/airflow/hooks/druid_hook.py > >> +++ b/airflow/hooks/druid_hook.py > >> @@ -10,7 +10,7 @@ from airflow.hooks.base_hook import BaseHook > >> from airflow.exceptions import AirflowException > >> > >> LOAD_CHECK_INTERVAL = 5 > >> - > >> +TARGET_PARTITION_SIZE = 5000000 > >> > >> class AirflowDruidLoadException(AirflowException): > >> pass > >> @@ -52,13 +52,22 @@ class DruidHook(BaseHook): > >> > >> def construct_ingest_query( > >> self, datasource, static_path, ts_dim, columns, > metric_spec, > >> - intervals, num_shards, hadoop_dependency_coordinates=None): > >> + intervals, num_shards, target_partition_size, > >> hadoop_dependency_coordinates=None): > >> """ > >> Builds an ingest query for an HDFS TSV load. > >> > >> :param datasource: target datasource in druid > >> :param columns: list of all columns in the TSV, in the right > >> order > >> """ > >> + > >> + # backward compatibilty for num_shards, but > >> target_partition_size is the default setting > >> + # and overwrites the num_shards > >> + if target_partition_size == -1: > >> + if num_shards == -1: > >> + target_partition_size = TARGET_PARTITION_SIZE > >> + else: > >> + num_shards = -1 > >> + > >> metric_names = [ > >> m['fieldName'] for m in metric_spec if m['type'] != > 'count'] > >> dimensions = [c for c in columns if c not in metric_names and c > >> != ts_dim] > >> @@ -100,7 +109,7 @@ class DruidHook(BaseHook): > >> }, > >> "partitionsSpec" : { > >> "type" : "hashed", > >> - "targetPartitionSize" : -1, > >> + "targetPartitionSize" : target_partition_size, > >> "numShards" : num_shards, > >> }, > >> }, > >> @@ -121,10 +130,10 @@ class DruidHook(BaseHook): > >> > >> def send_ingest_query( > >> self, datasource, static_path, ts_dim, columns, > metric_spec, > >> - intervals, num_shards, hadoop_dependency_coordinates=None): > >> + intervals, num_shards, target_partition_size, > >> hadoop_dependency_coordinates=None): > >> query = self.construct_ingest_query( > >> datasource, static_path, ts_dim, columns, > >> - metric_spec, intervals, num_shards, > >> hadoop_dependency_coordinates) > >> + metric_spec, intervals, num_shards, target_partition_size, > >> hadoop_dependency_coordinates) > >> r = requests.post( > >> self.ingest_post_url, headers=self.header, data=query) > >> logging.info(self.ingest_post_url) > >> @@ -138,7 +147,7 @@ class DruidHook(BaseHook): > >> > >> def load_from_hdfs( > >> self, datasource, static_path, ts_dim, columns, > >> - intervals, num_shards, metric_spec=None, > >> hadoop_dependency_coordinates=None): > >> + intervals, num_shards, target_partition_size, > >> metric_spec=None, hadoop_dependency_coordinates=None): > >> """ > >> load data to druid from hdfs > >> :params ts_dim: The column name to use as a timestamp > >> @@ -146,7 +155,7 @@ class DruidHook(BaseHook): > >> """ > >> task_id = self.send_ingest_query( > >> datasource, static_path, ts_dim, columns, metric_spec, > >> - intervals, num_shards, hadoop_dependency_coordinates) > >> + intervals, num_shards, target_partition_size, > >> hadoop_dependency_coordinates) > >> status_url = self.get_ingest_status_url(task_id) > >> while True: > >> r = requests.get(status_url) > >> > >> > >> > http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/operators/hive_to_druid.py > >> ---------------------------------------------------------------------- > >> diff --git a/airflow/operators/hive_to_druid.py > >> b/airflow/operators/hive_to_druid.py > >> index 1346841..420aeed 100644 > >> --- a/airflow/operators/hive_to_druid.py > >> +++ b/airflow/operators/hive_to_druid.py > >> @@ -49,7 +49,8 @@ class HiveToDruidTransfer(BaseOperator): > >> metastore_conn_id='metastore_default', > >> hadoop_dependency_coordinates=None, > >> intervals=None, > >> - num_shards=1, > >> + num_shards=-1, > >> + target_partition_size=-1, > >> *args, **kwargs): > >> super(HiveToDruidTransfer, self).__init__(*args, **kwargs) > >> self.sql = sql > >> @@ -57,6 +58,7 @@ class HiveToDruidTransfer(BaseOperator): > >> self.ts_dim = ts_dim > >> self.intervals = intervals or ['{{ ds }}/{{ tomorrow_ds }}'] > >> self.num_shards = num_shards > >> + self.target_partition_size = target_partition_size > >> self.metric_spec = metric_spec or [{ > >> "name": "count", > >> "type": "count"}] > >> @@ -103,8 +105,8 @@ class HiveToDruidTransfer(BaseOperator): > >> datasource=self.druid_datasource, > >> intervals=self.intervals, > >> static_path=static_path, ts_dim=self.ts_dim, > >> - columns=columns, num_shards=self.num_shards, > >> metric_spec=self.metric_spec, > >> - > >> hadoop_dependency_coordinates=self.hadoop_dependency_coordinates) > >> + columns=columns, num_shards=self.num_shards, > >> target_partition_size=self.target_partition_size, > >> + metric_spec=self.metric_spec, > >> hadoop_dependency_coordinates=self.hadoop_dependency_coordinates) > >> logging.info("Load seems to have succeeded!") > >> > >> logging.info( > >> > >> > > >