Re: [1/3] incubator-airflow git commit: use targetPartitionSize as the default partition spec
s/two/too .. sigh On Mon, May 23, 2016 at 8:29 PM, Chris Riccominiwrote: > Ah, yea. I get bitten by that two. It's annoying to have to ask people to > add a JIRA to their commit message. And we can't squash through GitHub > anymore. :( Wonder if the airflow-pr script allows us to edit it? I think > it might > > On Mon, May 23, 2016 at 5:50 PM, Dan Davydov < > dan.davy...@airbnb.com.invalid> wrote: > >> Yep sorry will check the versions in the future. My own commits have JIRA >> labels but I haven't validated that other users have done this for theirs >> when I merge their commits (as the LGTM is delegated to either another >> committer or the owner of a particular operator). Will be more vigilant in >> the future. >> >> On Mon, May 23, 2016 at 5:07 PM, Chris Riccomini >> wrote: >> >> > Hey Dan, >> > >> > Could you please file JIRAs, and put the JIRA name as the prefix to your >> > commits? >> > >> > Cheers, >> > Chris >> > >> > On Mon, May 23, 2016 at 5:01 PM, wrote: >> > >> >> Repository: incubator-airflow >> >> Updated Branches: >> >> refs/heads/airbnb_rb1.7.1_4 1d0d8681d -> 6f7ea90ae >> >> >> >> >> >> use targetPartitionSize as the default partition spec >> >> >> >> >> >> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo >> >> Commit: >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b58b5e09 >> >> Tree: >> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b58b5e09 >> >> Diff: >> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b58b5e09 >> >> >> >> Branch: refs/heads/airbnb_rb1.7.1_4 >> >> Commit: b58b5e09578d8a0df17b4de12fe3b49792e9feda >> >> Parents: 1d0d868 >> >> Author: Hongbo Zeng >> >> Authored: Sat May 14 17:00:42 2016 -0700 >> >> Committer: Dan Davydov >> >> Committed: Mon May 23 16:59:52 2016 -0700 >> >> >> >> -- >> >> airflow/hooks/druid_hook.py| 23 --- >> >> airflow/operators/hive_to_druid.py | 8 +--- >> >> 2 files changed, 21 insertions(+), 10 deletions(-) >> >> -- >> >> >> >> >> >> >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/hooks/druid_hook.py >> >> -- >> >> diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py >> >> index b6cb231..7c80c7c 100644 >> >> --- a/airflow/hooks/druid_hook.py >> >> +++ b/airflow/hooks/druid_hook.py >> >> @@ -10,7 +10,7 @@ from airflow.hooks.base_hook import BaseHook >> >> from airflow.exceptions import AirflowException >> >> >> >> LOAD_CHECK_INTERVAL = 5 >> >> - >> >> +TARGET_PARTITION_SIZE = 500 >> >> >> >> class AirflowDruidLoadException(AirflowException): >> >> pass >> >> @@ -52,13 +52,22 @@ class DruidHook(BaseHook): >> >> >> >> def construct_ingest_query( >> >> self, datasource, static_path, ts_dim, columns, >> metric_spec, >> >> -intervals, num_shards, >> hadoop_dependency_coordinates=None): >> >> +intervals, num_shards, target_partition_size, >> >> hadoop_dependency_coordinates=None): >> >> """ >> >> Builds an ingest query for an HDFS TSV load. >> >> >> >> :param datasource: target datasource in druid >> >> :param columns: list of all columns in the TSV, in the right >> >> order >> >> """ >> >> + >> >> +# backward compatibilty for num_shards, but >> >> target_partition_size is the default setting >> >> +# and overwrites the num_shards >> >> +if target_partition_size == -1: >> >> +if num_shards == -1: >> >> +target_partition_size = TARGET_PARTITION_SIZE >> >> +else: >> >> +num_shards = -1 >> >> + >> >> metric_names = [ >> >> m['fieldName'] for m in metric_spec if m['type'] != >> 'count'] >> >> dimensions = [c for c in columns if c not in metric_names and >> c >> >> != ts_dim] >> >> @@ -100,7 +109,7 @@ class DruidHook(BaseHook): >> >> }, >> >> "partitionsSpec" : { >> >> "type" : "hashed", >> >> -"targetPartitionSize" : -1, >> >> +"targetPartitionSize" : target_partition_size, >> >> "numShards" : num_shards, >> >> }, >> >> }, >> >> @@ -121,10 +130,10 @@ class DruidHook(BaseHook): >> >> >> >> def send_ingest_query( >> >> self, datasource, static_path, ts_dim, columns, >> metric_spec, >> >> -intervals, num_shards, >> hadoop_dependency_coordinates=None): >> >> +intervals, num_shards, target_partition_size, >> >> hadoop_dependency_coordinates=None): >>
[1/3] incubator-airflow git commit: use targetPartitionSize as the default partition spec
Repository: incubator-airflow Updated Branches: refs/heads/master 72ab63e83 -> db07e04f9 use targetPartitionSize as the default partition spec Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b565ef99 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b565ef99 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b565ef99 Branch: refs/heads/master Commit: b565ef9952488b6a5f77becab3c430816af33e90 Parents: 07fe7d7 Author: Hongbo ZengAuthored: Sat May 14 17:00:42 2016 -0700 Committer: Hongbo Zeng Committed: Sat May 14 17:00:42 2016 -0700 -- airflow/hooks/druid_hook.py| 23 --- airflow/operators/hive_to_druid.py | 8 +--- 2 files changed, 21 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b565ef99/airflow/hooks/druid_hook.py -- diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py index b6cb231..7c80c7c 100644 --- a/airflow/hooks/druid_hook.py +++ b/airflow/hooks/druid_hook.py @@ -10,7 +10,7 @@ from airflow.hooks.base_hook import BaseHook from airflow.exceptions import AirflowException LOAD_CHECK_INTERVAL = 5 - +TARGET_PARTITION_SIZE = 500 class AirflowDruidLoadException(AirflowException): pass @@ -52,13 +52,22 @@ class DruidHook(BaseHook): def construct_ingest_query( self, datasource, static_path, ts_dim, columns, metric_spec, -intervals, num_shards, hadoop_dependency_coordinates=None): +intervals, num_shards, target_partition_size, hadoop_dependency_coordinates=None): """ Builds an ingest query for an HDFS TSV load. :param datasource: target datasource in druid :param columns: list of all columns in the TSV, in the right order """ + +# backward compatibilty for num_shards, but target_partition_size is the default setting +# and overwrites the num_shards +if target_partition_size == -1: +if num_shards == -1: +target_partition_size = TARGET_PARTITION_SIZE +else: +num_shards = -1 + metric_names = [ m['fieldName'] for m in metric_spec if m['type'] != 'count'] dimensions = [c for c in columns if c not in metric_names and c != ts_dim] @@ -100,7 +109,7 @@ class DruidHook(BaseHook): }, "partitionsSpec" : { "type" : "hashed", -"targetPartitionSize" : -1, +"targetPartitionSize" : target_partition_size, "numShards" : num_shards, }, }, @@ -121,10 +130,10 @@ class DruidHook(BaseHook): def send_ingest_query( self, datasource, static_path, ts_dim, columns, metric_spec, -intervals, num_shards, hadoop_dependency_coordinates=None): +intervals, num_shards, target_partition_size, hadoop_dependency_coordinates=None): query = self.construct_ingest_query( datasource, static_path, ts_dim, columns, -metric_spec, intervals, num_shards, hadoop_dependency_coordinates) +metric_spec, intervals, num_shards, target_partition_size, hadoop_dependency_coordinates) r = requests.post( self.ingest_post_url, headers=self.header, data=query) logging.info(self.ingest_post_url) @@ -138,7 +147,7 @@ class DruidHook(BaseHook): def load_from_hdfs( self, datasource, static_path, ts_dim, columns, -intervals, num_shards, metric_spec=None, hadoop_dependency_coordinates=None): +intervals, num_shards, target_partition_size, metric_spec=None, hadoop_dependency_coordinates=None): """ load data to druid from hdfs :params ts_dim: The column name to use as a timestamp @@ -146,7 +155,7 @@ class DruidHook(BaseHook): """ task_id = self.send_ingest_query( datasource, static_path, ts_dim, columns, metric_spec, -intervals, num_shards, hadoop_dependency_coordinates) +intervals, num_shards, target_partition_size, hadoop_dependency_coordinates) status_url = self.get_ingest_status_url(task_id) while True: r = requests.get(status_url) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b565ef99/airflow/operators/hive_to_druid.py -- diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py index 1346841..420aeed 100644 ---