Re: [1/3] incubator-airflow git commit: use targetPartitionSize as the default partition spec

2016-05-23 Thread Chris Riccomini
s/two/too .. sigh

On Mon, May 23, 2016 at 8:29 PM, Chris Riccomini 
wrote:

> Ah, yea. I get bitten by that two. It's annoying to have to ask people to
> add a JIRA to their commit message. And we can't squash through GitHub
> anymore. :( Wonder if the airflow-pr script allows us to edit it? I think
> it might
>
> On Mon, May 23, 2016 at 5:50 PM, Dan Davydov <
> dan.davy...@airbnb.com.invalid> wrote:
>
>> Yep sorry will check the versions in the future. My own commits have JIRA
>> labels but I haven't validated that other users have done this for theirs
>> when I merge their commits (as the LGTM is delegated to either another
>> committer or the owner of a particular operator). Will be more vigilant in
>> the future.
>>
>> On Mon, May 23, 2016 at 5:07 PM, Chris Riccomini 
>> wrote:
>>
>> > Hey Dan,
>> >
>> > Could you please file JIRAs, and put the JIRA name as the prefix to your
>> > commits?
>> >
>> > Cheers,
>> > Chris
>> >
>> > On Mon, May 23, 2016 at 5:01 PM,  wrote:
>> >
>> >> Repository: incubator-airflow
>> >> Updated Branches:
>> >>   refs/heads/airbnb_rb1.7.1_4 1d0d8681d -> 6f7ea90ae
>> >>
>> >>
>> >> use targetPartitionSize as the default partition spec
>> >>
>> >>
>> >> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
>> >> Commit:
>> >>
>> http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b58b5e09
>> >> Tree:
>> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b58b5e09
>> >> Diff:
>> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b58b5e09
>> >>
>> >> Branch: refs/heads/airbnb_rb1.7.1_4
>> >> Commit: b58b5e09578d8a0df17b4de12fe3b49792e9feda
>> >> Parents: 1d0d868
>> >> Author: Hongbo Zeng 
>> >> Authored: Sat May 14 17:00:42 2016 -0700
>> >> Committer: Dan Davydov 
>> >> Committed: Mon May 23 16:59:52 2016 -0700
>> >>
>> >> --
>> >>  airflow/hooks/druid_hook.py| 23 ---
>> >>  airflow/operators/hive_to_druid.py |  8 +---
>> >>  2 files changed, 21 insertions(+), 10 deletions(-)
>> >> --
>> >>
>> >>
>> >>
>> >>
>> http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/hooks/druid_hook.py
>> >> --
>> >> diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
>> >> index b6cb231..7c80c7c 100644
>> >> --- a/airflow/hooks/druid_hook.py
>> >> +++ b/airflow/hooks/druid_hook.py
>> >> @@ -10,7 +10,7 @@ from airflow.hooks.base_hook import BaseHook
>> >>  from airflow.exceptions import AirflowException
>> >>
>> >>  LOAD_CHECK_INTERVAL = 5
>> >> -
>> >> +TARGET_PARTITION_SIZE = 500
>> >>
>> >>  class AirflowDruidLoadException(AirflowException):
>> >>  pass
>> >> @@ -52,13 +52,22 @@ class DruidHook(BaseHook):
>> >>
>> >>  def construct_ingest_query(
>> >>  self, datasource, static_path, ts_dim, columns,
>> metric_spec,
>> >> -intervals, num_shards,
>> hadoop_dependency_coordinates=None):
>> >> +intervals, num_shards, target_partition_size,
>> >> hadoop_dependency_coordinates=None):
>> >>  """
>> >>  Builds an ingest query for an HDFS TSV load.
>> >>
>> >>  :param datasource: target datasource in druid
>> >>  :param columns: list of all columns in the TSV, in the right
>> >> order
>> >>  """
>> >> +
>> >> +# backward compatibilty for num_shards, but
>> >> target_partition_size is the default setting
>> >> +# and overwrites the num_shards
>> >> +if target_partition_size == -1:
>> >> +if num_shards == -1:
>> >> +target_partition_size = TARGET_PARTITION_SIZE
>> >> +else:
>> >> +num_shards = -1
>> >> +
>> >>  metric_names = [
>> >>  m['fieldName'] for m in metric_spec if m['type'] !=
>> 'count']
>> >>  dimensions = [c for c in columns if c not in metric_names and
>> c
>> >> != ts_dim]
>> >> @@ -100,7 +109,7 @@ class DruidHook(BaseHook):
>> >>  },
>> >>  "partitionsSpec" : {
>> >>  "type" : "hashed",
>> >> -"targetPartitionSize" : -1,
>> >> +"targetPartitionSize" : target_partition_size,
>> >>  "numShards" : num_shards,
>> >>  },
>> >>  },
>> >> @@ -121,10 +130,10 @@ class DruidHook(BaseHook):
>> >>
>> >>  def send_ingest_query(
>> >>  self, datasource, static_path, ts_dim, columns,
>> metric_spec,
>> >> -intervals, num_shards,
>> hadoop_dependency_coordinates=None):
>> >> +intervals, num_shards, target_partition_size,
>> >> hadoop_dependency_coordinates=None):
>> >>  query = self.construct_ingest_query(
>> >>  datasource, static_path, ts_dim, columns,
>> >> 

Re: [1/3] incubator-airflow git commit: use targetPartitionSize as the default partition spec

2016-05-23 Thread Chris Riccomini
Ah, yea. I get bitten by that two. It's annoying to have to ask people to
add a JIRA to their commit message. And we can't squash through GitHub
anymore. :( Wonder if the airflow-pr script allows us to edit it? I think
it might

On Mon, May 23, 2016 at 5:50 PM, Dan Davydov  wrote:

> Yep sorry will check the versions in the future. My own commits have JIRA
> labels but I haven't validated that other users have done this for theirs
> when I merge their commits (as the LGTM is delegated to either another
> committer or the owner of a particular operator). Will be more vigilant in
> the future.
>
> On Mon, May 23, 2016 at 5:07 PM, Chris Riccomini 
> wrote:
>
> > Hey Dan,
> >
> > Could you please file JIRAs, and put the JIRA name as the prefix to your
> > commits?
> >
> > Cheers,
> > Chris
> >
> > On Mon, May 23, 2016 at 5:01 PM,  wrote:
> >
> >> Repository: incubator-airflow
> >> Updated Branches:
> >>   refs/heads/airbnb_rb1.7.1_4 1d0d8681d -> 6f7ea90ae
> >>
> >>
> >> use targetPartitionSize as the default partition spec
> >>
> >>
> >> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
> >> Commit:
> >>
> http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b58b5e09
> >> Tree:
> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b58b5e09
> >> Diff:
> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b58b5e09
> >>
> >> Branch: refs/heads/airbnb_rb1.7.1_4
> >> Commit: b58b5e09578d8a0df17b4de12fe3b49792e9feda
> >> Parents: 1d0d868
> >> Author: Hongbo Zeng 
> >> Authored: Sat May 14 17:00:42 2016 -0700
> >> Committer: Dan Davydov 
> >> Committed: Mon May 23 16:59:52 2016 -0700
> >>
> >> --
> >>  airflow/hooks/druid_hook.py| 23 ---
> >>  airflow/operators/hive_to_druid.py |  8 +---
> >>  2 files changed, 21 insertions(+), 10 deletions(-)
> >> --
> >>
> >>
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/hooks/druid_hook.py
> >> --
> >> diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
> >> index b6cb231..7c80c7c 100644
> >> --- a/airflow/hooks/druid_hook.py
> >> +++ b/airflow/hooks/druid_hook.py
> >> @@ -10,7 +10,7 @@ from airflow.hooks.base_hook import BaseHook
> >>  from airflow.exceptions import AirflowException
> >>
> >>  LOAD_CHECK_INTERVAL = 5
> >> -
> >> +TARGET_PARTITION_SIZE = 500
> >>
> >>  class AirflowDruidLoadException(AirflowException):
> >>  pass
> >> @@ -52,13 +52,22 @@ class DruidHook(BaseHook):
> >>
> >>  def construct_ingest_query(
> >>  self, datasource, static_path, ts_dim, columns,
> metric_spec,
> >> -intervals, num_shards, hadoop_dependency_coordinates=None):
> >> +intervals, num_shards, target_partition_size,
> >> hadoop_dependency_coordinates=None):
> >>  """
> >>  Builds an ingest query for an HDFS TSV load.
> >>
> >>  :param datasource: target datasource in druid
> >>  :param columns: list of all columns in the TSV, in the right
> >> order
> >>  """
> >> +
> >> +# backward compatibilty for num_shards, but
> >> target_partition_size is the default setting
> >> +# and overwrites the num_shards
> >> +if target_partition_size == -1:
> >> +if num_shards == -1:
> >> +target_partition_size = TARGET_PARTITION_SIZE
> >> +else:
> >> +num_shards = -1
> >> +
> >>  metric_names = [
> >>  m['fieldName'] for m in metric_spec if m['type'] !=
> 'count']
> >>  dimensions = [c for c in columns if c not in metric_names and c
> >> != ts_dim]
> >> @@ -100,7 +109,7 @@ class DruidHook(BaseHook):
> >>  },
> >>  "partitionsSpec" : {
> >>  "type" : "hashed",
> >> -"targetPartitionSize" : -1,
> >> +"targetPartitionSize" : target_partition_size,
> >>  "numShards" : num_shards,
> >>  },
> >>  },
> >> @@ -121,10 +130,10 @@ class DruidHook(BaseHook):
> >>
> >>  def send_ingest_query(
> >>  self, datasource, static_path, ts_dim, columns,
> metric_spec,
> >> -intervals, num_shards, hadoop_dependency_coordinates=None):
> >> +intervals, num_shards, target_partition_size,
> >> hadoop_dependency_coordinates=None):
> >>  query = self.construct_ingest_query(
> >>  datasource, static_path, ts_dim, columns,
> >> -metric_spec, intervals, num_shards,
> >> hadoop_dependency_coordinates)
> >> +metric_spec, intervals, num_shards, target_partition_size,
> >> hadoop_dependency_coordinates)
> >>  r = requests.post(
> >>  se

Re: [1/3] incubator-airflow git commit: use targetPartitionSize as the default partition spec

2016-05-23 Thread Chris Riccomini
Hey Dan,

Could you please file JIRAs, and put the JIRA name as the prefix to your
commits?

Cheers,
Chris

On Mon, May 23, 2016 at 5:01 PM,  wrote:

> Repository: incubator-airflow
> Updated Branches:
>   refs/heads/airbnb_rb1.7.1_4 1d0d8681d -> 6f7ea90ae
>
>
> use targetPartitionSize as the default partition spec
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b58b5e09
> Tree:
> http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b58b5e09
> Diff:
> http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b58b5e09
>
> Branch: refs/heads/airbnb_rb1.7.1_4
> Commit: b58b5e09578d8a0df17b4de12fe3b49792e9feda
> Parents: 1d0d868
> Author: Hongbo Zeng 
> Authored: Sat May 14 17:00:42 2016 -0700
> Committer: Dan Davydov 
> Committed: Mon May 23 16:59:52 2016 -0700
>
> --
>  airflow/hooks/druid_hook.py| 23 ---
>  airflow/operators/hive_to_druid.py |  8 +---
>  2 files changed, 21 insertions(+), 10 deletions(-)
> --
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/hooks/druid_hook.py
> --
> diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
> index b6cb231..7c80c7c 100644
> --- a/airflow/hooks/druid_hook.py
> +++ b/airflow/hooks/druid_hook.py
> @@ -10,7 +10,7 @@ from airflow.hooks.base_hook import BaseHook
>  from airflow.exceptions import AirflowException
>
>  LOAD_CHECK_INTERVAL = 5
> -
> +TARGET_PARTITION_SIZE = 500
>
>  class AirflowDruidLoadException(AirflowException):
>  pass
> @@ -52,13 +52,22 @@ class DruidHook(BaseHook):
>
>  def construct_ingest_query(
>  self, datasource, static_path, ts_dim, columns, metric_spec,
> -intervals, num_shards, hadoop_dependency_coordinates=None):
> +intervals, num_shards, target_partition_size,
> hadoop_dependency_coordinates=None):
>  """
>  Builds an ingest query for an HDFS TSV load.
>
>  :param datasource: target datasource in druid
>  :param columns: list of all columns in the TSV, in the right order
>  """
> +
> +# backward compatibilty for num_shards, but target_partition_size
> is the default setting
> +# and overwrites the num_shards
> +if target_partition_size == -1:
> +if num_shards == -1:
> +target_partition_size = TARGET_PARTITION_SIZE
> +else:
> +num_shards = -1
> +
>  metric_names = [
>  m['fieldName'] for m in metric_spec if m['type'] != 'count']
>  dimensions = [c for c in columns if c not in metric_names and c
> != ts_dim]
> @@ -100,7 +109,7 @@ class DruidHook(BaseHook):
>  },
>  "partitionsSpec" : {
>  "type" : "hashed",
> -"targetPartitionSize" : -1,
> +"targetPartitionSize" : target_partition_size,
>  "numShards" : num_shards,
>  },
>  },
> @@ -121,10 +130,10 @@ class DruidHook(BaseHook):
>
>  def send_ingest_query(
>  self, datasource, static_path, ts_dim, columns, metric_spec,
> -intervals, num_shards, hadoop_dependency_coordinates=None):
> +intervals, num_shards, target_partition_size,
> hadoop_dependency_coordinates=None):
>  query = self.construct_ingest_query(
>  datasource, static_path, ts_dim, columns,
> -metric_spec, intervals, num_shards,
> hadoop_dependency_coordinates)
> +metric_spec, intervals, num_shards, target_partition_size,
> hadoop_dependency_coordinates)
>  r = requests.post(
>  self.ingest_post_url, headers=self.header, data=query)
>  logging.info(self.ingest_post_url)
> @@ -138,7 +147,7 @@ class DruidHook(BaseHook):
>
>  def load_from_hdfs(
>  self, datasource, static_path,  ts_dim, columns,
> -intervals, num_shards, metric_spec=None,
> hadoop_dependency_coordinates=None):
> +intervals, num_shards, target_partition_size,
> metric_spec=None, hadoop_dependency_coordinates=None):
>  """
>  load data to druid from hdfs
>  :params ts_dim: The column name to use as a timestamp
> @@ -146,7 +155,7 @@ class DruidHook(BaseHook):
>  """
>  task_id = self.send_ingest_query(
>  datasource, static_path, ts_dim, columns, metric_spec,
> -intervals, num_shards, hadoop_dependency_coordinates)
> +intervals, num_shards, target_partition_size,
> hadoop_dependency_coordinates)
>  status_url = self.get_ingest_status_url(task_id)
>  while True:
>