Re: [1/3] incubator-airflow git commit: use targetPartitionSize as the default partition spec
s/two/too .. sigh On Mon, May 23, 2016 at 8:29 PM, Chris Riccomini wrote: > Ah, yea. I get bitten by that two. It's annoying to have to ask people to > add a JIRA to their commit message. And we can't squash through GitHub > anymore. :( Wonder if the airflow-pr script allows us to edit it? I think > it might > > On Mon, May 23, 2016 at 5:50 PM, Dan Davydov < > dan.davy...@airbnb.com.invalid> wrote: > >> Yep sorry will check the versions in the future. My own commits have JIRA >> labels but I haven't validated that other users have done this for theirs >> when I merge their commits (as the LGTM is delegated to either another >> committer or the owner of a particular operator). Will be more vigilant in >> the future. >> >> On Mon, May 23, 2016 at 5:07 PM, Chris Riccomini >> wrote: >> >> > Hey Dan, >> > >> > Could you please file JIRAs, and put the JIRA name as the prefix to your >> > commits? >> > >> > Cheers, >> > Chris >> > >> > On Mon, May 23, 2016 at 5:01 PM, wrote: >> > >> >> Repository: incubator-airflow >> >> Updated Branches: >> >> refs/heads/airbnb_rb1.7.1_4 1d0d8681d -> 6f7ea90ae >> >> >> >> >> >> use targetPartitionSize as the default partition spec >> >> >> >> >> >> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo >> >> Commit: >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b58b5e09 >> >> Tree: >> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b58b5e09 >> >> Diff: >> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b58b5e09 >> >> >> >> Branch: refs/heads/airbnb_rb1.7.1_4 >> >> Commit: b58b5e09578d8a0df17b4de12fe3b49792e9feda >> >> Parents: 1d0d868 >> >> Author: Hongbo Zeng >> >> Authored: Sat May 14 17:00:42 2016 -0700 >> >> Committer: Dan Davydov >> >> Committed: Mon May 23 16:59:52 2016 -0700 >> >> >> >> -- >> >> airflow/hooks/druid_hook.py| 23 --- >> >> airflow/operators/hive_to_druid.py | 8 +--- >> >> 2 files changed, 21 insertions(+), 10 deletions(-) >> >> -- >> >> >> >> >> >> >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/hooks/druid_hook.py >> >> -- >> >> diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py >> >> index b6cb231..7c80c7c 100644 >> >> --- a/airflow/hooks/druid_hook.py >> >> +++ b/airflow/hooks/druid_hook.py >> >> @@ -10,7 +10,7 @@ from airflow.hooks.base_hook import BaseHook >> >> from airflow.exceptions import AirflowException >> >> >> >> LOAD_CHECK_INTERVAL = 5 >> >> - >> >> +TARGET_PARTITION_SIZE = 500 >> >> >> >> class AirflowDruidLoadException(AirflowException): >> >> pass >> >> @@ -52,13 +52,22 @@ class DruidHook(BaseHook): >> >> >> >> def construct_ingest_query( >> >> self, datasource, static_path, ts_dim, columns, >> metric_spec, >> >> -intervals, num_shards, >> hadoop_dependency_coordinates=None): >> >> +intervals, num_shards, target_partition_size, >> >> hadoop_dependency_coordinates=None): >> >> """ >> >> Builds an ingest query for an HDFS TSV load. >> >> >> >> :param datasource: target datasource in druid >> >> :param columns: list of all columns in the TSV, in the right >> >> order >> >> """ >> >> + >> >> +# backward compatibilty for num_shards, but >> >> target_partition_size is the default setting >> >> +# and overwrites the num_shards >> >> +if target_partition_size == -1: >> >> +if num_shards == -1: >> >> +target_partition_size = TARGET_PARTITION_SIZE >> >> +else: >> >> +num_shards = -1 >> >> + >> >> metric_names = [ >> >> m['fieldName'] for m in metric_spec if m['type'] != >> 'count'] >> >> dimensions = [c for c in columns if c not in metric_names and >> c >> >> != ts_dim] >> >> @@ -100,7 +109,7 @@ class DruidHook(BaseHook): >> >> }, >> >> "partitionsSpec" : { >> >> "type" : "hashed", >> >> -"targetPartitionSize" : -1, >> >> +"targetPartitionSize" : target_partition_size, >> >> "numShards" : num_shards, >> >> }, >> >> }, >> >> @@ -121,10 +130,10 @@ class DruidHook(BaseHook): >> >> >> >> def send_ingest_query( >> >> self, datasource, static_path, ts_dim, columns, >> metric_spec, >> >> -intervals, num_shards, >> hadoop_dependency_coordinates=None): >> >> +intervals, num_shards, target_partition_size, >> >> hadoop_dependency_coordinates=None): >> >> query = self.construct_ingest_query( >> >> datasource, static_path, ts_dim, columns, >> >>
Re: [1/3] incubator-airflow git commit: use targetPartitionSize as the default partition spec
Ah, yea. I get bitten by that two. It's annoying to have to ask people to add a JIRA to their commit message. And we can't squash through GitHub anymore. :( Wonder if the airflow-pr script allows us to edit it? I think it might On Mon, May 23, 2016 at 5:50 PM, Dan Davydov wrote: > Yep sorry will check the versions in the future. My own commits have JIRA > labels but I haven't validated that other users have done this for theirs > when I merge their commits (as the LGTM is delegated to either another > committer or the owner of a particular operator). Will be more vigilant in > the future. > > On Mon, May 23, 2016 at 5:07 PM, Chris Riccomini > wrote: > > > Hey Dan, > > > > Could you please file JIRAs, and put the JIRA name as the prefix to your > > commits? > > > > Cheers, > > Chris > > > > On Mon, May 23, 2016 at 5:01 PM, wrote: > > > >> Repository: incubator-airflow > >> Updated Branches: > >> refs/heads/airbnb_rb1.7.1_4 1d0d8681d -> 6f7ea90ae > >> > >> > >> use targetPartitionSize as the default partition spec > >> > >> > >> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo > >> Commit: > >> > http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b58b5e09 > >> Tree: > >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b58b5e09 > >> Diff: > >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b58b5e09 > >> > >> Branch: refs/heads/airbnb_rb1.7.1_4 > >> Commit: b58b5e09578d8a0df17b4de12fe3b49792e9feda > >> Parents: 1d0d868 > >> Author: Hongbo Zeng > >> Authored: Sat May 14 17:00:42 2016 -0700 > >> Committer: Dan Davydov > >> Committed: Mon May 23 16:59:52 2016 -0700 > >> > >> -- > >> airflow/hooks/druid_hook.py| 23 --- > >> airflow/operators/hive_to_druid.py | 8 +--- > >> 2 files changed, 21 insertions(+), 10 deletions(-) > >> -- > >> > >> > >> > >> > http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/hooks/druid_hook.py > >> -- > >> diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py > >> index b6cb231..7c80c7c 100644 > >> --- a/airflow/hooks/druid_hook.py > >> +++ b/airflow/hooks/druid_hook.py > >> @@ -10,7 +10,7 @@ from airflow.hooks.base_hook import BaseHook > >> from airflow.exceptions import AirflowException > >> > >> LOAD_CHECK_INTERVAL = 5 > >> - > >> +TARGET_PARTITION_SIZE = 500 > >> > >> class AirflowDruidLoadException(AirflowException): > >> pass > >> @@ -52,13 +52,22 @@ class DruidHook(BaseHook): > >> > >> def construct_ingest_query( > >> self, datasource, static_path, ts_dim, columns, > metric_spec, > >> -intervals, num_shards, hadoop_dependency_coordinates=None): > >> +intervals, num_shards, target_partition_size, > >> hadoop_dependency_coordinates=None): > >> """ > >> Builds an ingest query for an HDFS TSV load. > >> > >> :param datasource: target datasource in druid > >> :param columns: list of all columns in the TSV, in the right > >> order > >> """ > >> + > >> +# backward compatibilty for num_shards, but > >> target_partition_size is the default setting > >> +# and overwrites the num_shards > >> +if target_partition_size == -1: > >> +if num_shards == -1: > >> +target_partition_size = TARGET_PARTITION_SIZE > >> +else: > >> +num_shards = -1 > >> + > >> metric_names = [ > >> m['fieldName'] for m in metric_spec if m['type'] != > 'count'] > >> dimensions = [c for c in columns if c not in metric_names and c > >> != ts_dim] > >> @@ -100,7 +109,7 @@ class DruidHook(BaseHook): > >> }, > >> "partitionsSpec" : { > >> "type" : "hashed", > >> -"targetPartitionSize" : -1, > >> +"targetPartitionSize" : target_partition_size, > >> "numShards" : num_shards, > >> }, > >> }, > >> @@ -121,10 +130,10 @@ class DruidHook(BaseHook): > >> > >> def send_ingest_query( > >> self, datasource, static_path, ts_dim, columns, > metric_spec, > >> -intervals, num_shards, hadoop_dependency_coordinates=None): > >> +intervals, num_shards, target_partition_size, > >> hadoop_dependency_coordinates=None): > >> query = self.construct_ingest_query( > >> datasource, static_path, ts_dim, columns, > >> -metric_spec, intervals, num_shards, > >> hadoop_dependency_coordinates) > >> +metric_spec, intervals, num_shards, target_partition_size, > >> hadoop_dependency_coordinates) > >> r = requests.post( > >> se
Re: [1/3] incubator-airflow git commit: use targetPartitionSize as the default partition spec
Hey Dan, Could you please file JIRAs, and put the JIRA name as the prefix to your commits? Cheers, Chris On Mon, May 23, 2016 at 5:01 PM, wrote: > Repository: incubator-airflow > Updated Branches: > refs/heads/airbnb_rb1.7.1_4 1d0d8681d -> 6f7ea90ae > > > use targetPartitionSize as the default partition spec > > > Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo > Commit: > http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b58b5e09 > Tree: > http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b58b5e09 > Diff: > http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b58b5e09 > > Branch: refs/heads/airbnb_rb1.7.1_4 > Commit: b58b5e09578d8a0df17b4de12fe3b49792e9feda > Parents: 1d0d868 > Author: Hongbo Zeng > Authored: Sat May 14 17:00:42 2016 -0700 > Committer: Dan Davydov > Committed: Mon May 23 16:59:52 2016 -0700 > > -- > airflow/hooks/druid_hook.py| 23 --- > airflow/operators/hive_to_druid.py | 8 +--- > 2 files changed, 21 insertions(+), 10 deletions(-) > -- > > > > http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/hooks/druid_hook.py > -- > diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py > index b6cb231..7c80c7c 100644 > --- a/airflow/hooks/druid_hook.py > +++ b/airflow/hooks/druid_hook.py > @@ -10,7 +10,7 @@ from airflow.hooks.base_hook import BaseHook > from airflow.exceptions import AirflowException > > LOAD_CHECK_INTERVAL = 5 > - > +TARGET_PARTITION_SIZE = 500 > > class AirflowDruidLoadException(AirflowException): > pass > @@ -52,13 +52,22 @@ class DruidHook(BaseHook): > > def construct_ingest_query( > self, datasource, static_path, ts_dim, columns, metric_spec, > -intervals, num_shards, hadoop_dependency_coordinates=None): > +intervals, num_shards, target_partition_size, > hadoop_dependency_coordinates=None): > """ > Builds an ingest query for an HDFS TSV load. > > :param datasource: target datasource in druid > :param columns: list of all columns in the TSV, in the right order > """ > + > +# backward compatibilty for num_shards, but target_partition_size > is the default setting > +# and overwrites the num_shards > +if target_partition_size == -1: > +if num_shards == -1: > +target_partition_size = TARGET_PARTITION_SIZE > +else: > +num_shards = -1 > + > metric_names = [ > m['fieldName'] for m in metric_spec if m['type'] != 'count'] > dimensions = [c for c in columns if c not in metric_names and c > != ts_dim] > @@ -100,7 +109,7 @@ class DruidHook(BaseHook): > }, > "partitionsSpec" : { > "type" : "hashed", > -"targetPartitionSize" : -1, > +"targetPartitionSize" : target_partition_size, > "numShards" : num_shards, > }, > }, > @@ -121,10 +130,10 @@ class DruidHook(BaseHook): > > def send_ingest_query( > self, datasource, static_path, ts_dim, columns, metric_spec, > -intervals, num_shards, hadoop_dependency_coordinates=None): > +intervals, num_shards, target_partition_size, > hadoop_dependency_coordinates=None): > query = self.construct_ingest_query( > datasource, static_path, ts_dim, columns, > -metric_spec, intervals, num_shards, > hadoop_dependency_coordinates) > +metric_spec, intervals, num_shards, target_partition_size, > hadoop_dependency_coordinates) > r = requests.post( > self.ingest_post_url, headers=self.header, data=query) > logging.info(self.ingest_post_url) > @@ -138,7 +147,7 @@ class DruidHook(BaseHook): > > def load_from_hdfs( > self, datasource, static_path, ts_dim, columns, > -intervals, num_shards, metric_spec=None, > hadoop_dependency_coordinates=None): > +intervals, num_shards, target_partition_size, > metric_spec=None, hadoop_dependency_coordinates=None): > """ > load data to druid from hdfs > :params ts_dim: The column name to use as a timestamp > @@ -146,7 +155,7 @@ class DruidHook(BaseHook): > """ > task_id = self.send_ingest_query( > datasource, static_path, ts_dim, columns, metric_spec, > -intervals, num_shards, hadoop_dependency_coordinates) > +intervals, num_shards, target_partition_size, > hadoop_dependency_coordinates) > status_url = self.get_ingest_status_url(task_id) > while True: >