[jira] [Created] (FLINK-12181) Port ExecutionGraphRestartTest to new codebase

2019-04-12 Thread TisonKun (JIRA)
TisonKun created FLINK-12181:


 Summary: Port ExecutionGraphRestartTest to new codebase
 Key: FLINK-12181
 URL: https://issues.apache.org/jira/browse/FLINK-12181
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Affects Versions: 1.9.0
Reporter: TisonKun
 Fix For: 1.9.0


Port {{ExecutionGraphRestartTest}} to new codebase.

Mainly get rid of the usages of {{Scheduler}} and {{Instance}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12178) Remove legacy Scheduler

2019-04-12 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-12178:


Assignee: (was: TisonKun)

> Remove legacy Scheduler
> ---
>
> Key: FLINK-12178
> URL: https://issues.apache.org/jira/browse/FLINK-12178
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.9.0
>
>
> Remove legacy {{Scheduler}} and port its usages in valid tests with new 
> Scheduling mechanism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10569) Clean up uses of Scheduler and Instance in valid tests

2019-04-12 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun closed FLINK-10569.

Resolution: Staged

Close and superseded by removals of `Scheduler` FLINK-12178 and `Instance` 
FLINK-12179 individually.

> Clean up uses of Scheduler and Instance in valid tests
> --
>
> Key: FLINK-10569
> URL: https://issues.apache.org/jira/browse/FLINK-10569
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Legacy class {{Scheduler}} and {{Instance}} are still used in some valid 
> tests like {{ExecutionGraphRestartTest}}. We should replace them with FLIP-6 
> schedule mode. The best way I can find is use {{SimpleSlotProvider}}.
> Note that we need not to remove all use points among all files since most of 
> them stay in legacy codebase like {{JobManager.scala}} and would be removed 
> later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12179) Remove legacy Instance

2019-04-12 Thread TisonKun (JIRA)
TisonKun created FLINK-12179:


 Summary: Remove legacy Instance
 Key: FLINK-12179
 URL: https://issues.apache.org/jira/browse/FLINK-12179
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.9.0
Reporter: TisonKun
 Fix For: 1.9.0


Remove legacy {{Instance}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12180) Port ExecutionVertexCancelTest to new codebase

2019-04-12 Thread TisonKun (JIRA)
TisonKun created FLINK-12180:


 Summary: Port ExecutionVertexCancelTest to new codebase
 Key: FLINK-12180
 URL: https://issues.apache.org/jira/browse/FLINK-12180
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Affects Versions: 1.9.0
Reporter: TisonKun
 Fix For: 1.9.0


Port {{ExecutionVertexCancelTest}} to new codebase.

Mainly get rid of the usage of {{Instance}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12178) Remove legacy Scheduler

2019-04-12 Thread TisonKun (JIRA)
TisonKun created FLINK-12178:


 Summary: Remove legacy Scheduler
 Key: FLINK-12178
 URL: https://issues.apache.org/jira/browse/FLINK-12178
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.9.0
Reporter: TisonKun
Assignee: TisonKun
 Fix For: 1.9.0


Remove legacy {{Scheduler}} and port its usages in valid tests with new 
Scheduling mechanism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] TisonKun commented on issue #7815: [FLINK-10569][tests] Remove Instance usages in valid Test

2019-04-12 Thread GitBox
TisonKun commented on issue #7815:  [FLINK-10569][tests] Remove Instance usages 
in valid Test
URL: https://github.com/apache/flink/pull/7815#issuecomment-482774728
 
 
   Close and separated FLINK-10569 as removals of `Scheduler` and `Instance` 
individually.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun closed pull request #7815: [FLINK-10569][tests] Remove Instance usages in valid Test

2019-04-12 Thread GitBox
TisonKun closed pull request #7815:  [FLINK-10569][tests] Remove Instance 
usages in valid Test
URL: https://github.com/apache/flink/pull/7815
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12177) Remove legacy FlinkUntypedActor

2019-04-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12177:
---
Labels: pull-request-available  (was: )

> Remove legacy FlinkUntypedActor
> ---
>
> Key: FLINK-12177
> URL: https://issues.apache.org/jira/browse/FLINK-12177
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> Remove legacy FlinkUntypedActor



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8160: [FLINK-12177][coordination] Remove legacy FlinkUntypedActor

2019-04-12 Thread GitBox
flinkbot commented on issue #8160: [FLINK-12177][coordination] Remove legacy 
FlinkUntypedActor
URL: https://github.com/apache/flink/pull/8160#issuecomment-482774698
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun opened a new pull request #8160: [FLINK-12177][coordination] Remove legacy FlinkUntypedActor

2019-04-12 Thread GitBox
TisonKun opened a new pull request #8160: [FLINK-12177][coordination] Remove 
legacy FlinkUntypedActor
URL: https://github.com/apache/flink/pull/8160
 
 
   ## What is the purpose of the change
   
   Remove legacy `FlinkUntypedActor`.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector:(no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   
   cc @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12177) Remove legacy FlinkUntypedActor

2019-04-12 Thread TisonKun (JIRA)
TisonKun created FLINK-12177:


 Summary: Remove legacy FlinkUntypedActor
 Key: FLINK-12177
 URL: https://issues.apache.org/jira/browse/FLINK-12177
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.9.0
Reporter: TisonKun
Assignee: TisonKun
 Fix For: 1.9.0


Remove legacy FlinkUntypedActor



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12176) Unify JobGraph creation in CliFrontend

2019-04-12 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-12176:
-
Description: 
Currently, we create {{JobGraph}} by the following process

* if the cluster start in job mode, we create {{JobGraph}} by 
{{PackagedProgramUtils#createJobGraph}} and deploy a job cluster
* if the cluster start in session mode, we create {{JobGraph}} and submit it by 
{{CliFrontend#executeProgram}}, which internally the same as above but using 
{{ContextEnvironment}} instead of {{OptimizerPlanEnvironment}}.

{{ContextEnvironment}} not only create the job graph but also submit it. 
However, the processes of {{JobGraph}} creation in job mode and session mode 
are similar. That means, we can unify the process by always create {{JobGraph}} 
by {{PackagedProgramUtils#createJobGraph}}. And,

* in job mode, deploy job cluster with the {{JobGraph}}
* in session mode, submit the {{JobGraph}} to the session cluster

>From a higher view, it is helpful for a common view of job submission in both 
>job and session mode and give opportunities to refactor legacy client codes.

  was:
Currently, we create {{JobGraph}} by the following process

*. if the cluster start in job mode, we create {{JobGraph}} by 
{{PackagedProgramUtils#createJobGraph}} and deploy a job cluster
*. if the cluster start in session mode, we create {{JobGraph}} and submit it 
by {{CliFrontend#executeProgram}}, which internally the same as above but using 
{{ContextEnvironment}} instead of {{OptimizerPlanEnvironment}}.

{{ContextEnvironment}} not only create the job graph but also submit it. 
However, the processes of job mode and session mode are similar. That means, we 
can unify the process by always create {{JobGraph}} by 
{{PackagedProgramUtils#createJobGraph}}. And,

*. in job mode, deploy job cluster with the {{JobGraph}}
*. in session mode, submit the {{JobGraph}} to the session cluster

>From a higher view, it is helpful for a common view of job submission in both 
>job and session mode and give opportunities to refactor legacy client codes.


> Unify JobGraph creation in CliFrontend
> --
>
> Key: FLINK-12176
> URL: https://issues.apache.org/jira/browse/FLINK-12176
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Affects Versions: 1.9.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.9.0
>
>
> Currently, we create {{JobGraph}} by the following process
> * if the cluster start in job mode, we create {{JobGraph}} by 
> {{PackagedProgramUtils#createJobGraph}} and deploy a job cluster
> * if the cluster start in session mode, we create {{JobGraph}} and submit it 
> by {{CliFrontend#executeProgram}}, which internally the same as above but 
> using {{ContextEnvironment}} instead of {{OptimizerPlanEnvironment}}.
> {{ContextEnvironment}} not only create the job graph but also submit it. 
> However, the processes of {{JobGraph}} creation in job mode and session mode 
> are similar. That means, we can unify the process by always create 
> {{JobGraph}} by {{PackagedProgramUtils#createJobGraph}}. And,
> * in job mode, deploy job cluster with the {{JobGraph}}
> * in session mode, submit the {{JobGraph}} to the session cluster
> From a higher view, it is helpful for a common view of job submission in both 
> job and session mode and give opportunities to refactor legacy client codes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12176) Unify JobGraph creation in CliFrontend

2019-04-12 Thread TisonKun (JIRA)
TisonKun created FLINK-12176:


 Summary: Unify JobGraph creation in CliFrontend
 Key: FLINK-12176
 URL: https://issues.apache.org/jira/browse/FLINK-12176
 Project: Flink
  Issue Type: Improvement
  Components: Command Line Client
Affects Versions: 1.9.0
Reporter: TisonKun
Assignee: TisonKun
 Fix For: 1.9.0


Currently, we create {{JobGraph}} by the following process

*. if the cluster start in job mode, we create {{JobGraph}} by 
{{PackagedProgramUtils#createJobGraph}} and deploy a job cluster
*. if the cluster start in session mode, we create {{JobGraph}} and submit it 
by {{CliFrontend#executeProgram}}, which internally the same as above but using 
{{ContextEnvironment}} instead of {{OptimizerPlanEnvironment}}.

{{ContextEnvironment}} not only create the job graph but also submit it. 
However, the processes of job mode and session mode are similar. That means, we 
can unify the process by always create {{JobGraph}} by 
{{PackagedProgramUtils#createJobGraph}}. And,

*. in job mode, deploy job cluster with the {{JobGraph}}
*. in session mode, submit the {{JobGraph}} to the session cluster

>From a higher view, it is helpful for a common view of job submission in both 
>job and session mode and give opportunities to refactor legacy client codes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11935) Remove DateTimeUtils pull-in and fix datetime casting problem

2019-04-12 Thread Julian Hyde (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816755#comment-16816755
 ] 

Julian Hyde commented on FLINK-11935:
-

My advice is don't trust Java, do trust PostgreSQL. Java dates internals are 
based on UTC epoch, and their toString() is based on local-time and local 
calendar. PostgreSQL tends to do things right.

> Remove DateTimeUtils pull-in and fix datetime casting problem
> -
>
> Key: FLINK-11935
> URL: https://issues.apache.org/jira/browse/FLINK-11935
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Rong Rong
>Assignee: vinoyang
>Priority: Major
>
> This {{DateTimeUtils}} was pulled in in FLINK-7235.
> Originally the time operation was not correctly done via the {{ymdToJulian}} 
> function before the date {{1970-01-01}} thus we need the fix. similar to 
> addressing this problem:
> {code:java}
>  Optimized :1017-12-05 22:58:58.998 
>  Expected :1017-11-29 22:58:58.998
>  Actual :1017-12-05 22:58:58.998
> {code}
>  
> However, after pulling in avatica 1.13, I found out that the optimized plans 
> of the time operations are actually correct. it is in fact the casting part 
> that creates problem:
> For example, the following:
> *{{(plus(-12000.months, cast('2017-11-29 22:58:58.998', TIMESTAMP))}}*
> result in a StringTestExpression of:
> *{{CAST(1017-11-29 22:58:58.998):VARCHAR(65536) CHARACTER SET "UTF-16LE" 
> COLLATE "ISO-8859-1$en_US$primary" NOT NULL}}*
> but the testing results are:
> {code:java}
>  Optimized :1017-11-29 22:58:58.998
>  Expected :1017-11-29 22:58:58.998
>  Actual :1017-11-23 22:58:58.998
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS

2019-04-12 Thread GitBox
piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support 
for AzureFS
URL: https://github.com/apache/flink/pull/8117#issuecomment-482722850
 
 
   @Myasuka - did you get a chance to take a look at the rework? Anything else 
to address or can we merge this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12175) TypeExtractor.getMapReturnTypes produces different TypeInformation than createTypeInformation for classes with parameterized ancestors

2019-04-12 Thread Dylan Adams (JIRA)
Dylan Adams created FLINK-12175:
---

 Summary: TypeExtractor.getMapReturnTypes produces different 
TypeInformation than createTypeInformation for classes with parameterized 
ancestors
 Key: FLINK-12175
 URL: https://issues.apache.org/jira/browse/FLINK-12175
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.7.2, 1.9.0
Reporter: Dylan Adams


I expect that the {{TypeMapper}} {{createTypeInformation}} and 
{{getMapReturnTypes}} would produce equivalent type information for the same 
type. But when there is a parameterized superclass, this does not appear to be 
the case.

Here's a test case that could be added to {{PojoTypeExtractorTest.java}} that 
demonstrates the issue:
{code}
public static class Pojo implements Serializable {
public int digits;
public String letters;
}

public static class ParameterizedParent implements 
Serializable {
public T pojoField;
}

public static class ConcreteImpl extends ParameterizedParent {
public double precise;
}

public static class ConcreteMapFunction implements MapFunction {
@Override
public ConcreteImpl map(ConcreteImpl value) throws Exception {
return null;
}
}

@Test
public void testMapReturnType() {
final TypeInformation directTypeInfo = 
TypeExtractor.createTypeInfo(ConcreteImpl.class);

Assert.assertTrue(directTypeInfo instanceof PojoTypeInfo);
TypeInformation directPojoFieldTypeInfo = ((PojoTypeInfo) 
directTypeInfo).getPojoFieldAt(0).getTypeInformation();
Assert.assertTrue(directPojoFieldTypeInfo instanceof PojoTypeInfo);

final TypeInformation mapReturnTypeInfo
= TypeExtractor.getMapReturnTypes(new ConcreteMapFunction(), 
directTypeInfo);
Assert.assertTrue(mapReturnTypeInfo instanceof PojoTypeInfo);
TypeInformation mapReturnPojoFieldTypeInfo = ((PojoTypeInfo) 
mapReturnTypeInfo).getPojoFieldAt(0).getTypeInformation();
Assert.assertTrue(mapReturnPojoFieldTypeInfo instanceof PojoTypeInfo);

Assert.assertEquals(directTypeInfo, mapReturnTypeInfo);
}
{code}

This test case will fail on the last two asserts because {{getMapReturnTypes}} 
produces a {{TypeInformation}} for {{ConcreteImpl}} with a {{GenericTypeInfo}} 
for the {{pojoField}}, whereas {{createTypeInformation}} correctly produces a 
{{PojoTypeInfo}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11813) Standby per job mode Dispatchers don't know job's JobSchedulingStatus

2019-04-12 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816558#comment-16816558
 ] 

TisonKun edited comment on FLINK-11813 at 4/12/19 6:22 PM:
---

Thanks for you clarification and explanation [~till.rohrmann]. I like the idea 
that bound the entries of {{RunningJobsRegistry}} to the lifecycle of cluster.

For the scope of this JIRA we can say the solution could be simply keep DONE 
and never cleanup it until the end of cluster's lifecycle.

Further, I'd like to share my thoughts on, ideally, how {{Dispatcher}} 
scheduled jobs with the help of {{SubmittedJobsGraphStore}} and 
{{RunningJobsRegistry}}.



For the states of {{JobSchedulingStatus}}, since we always need to represent 
{{DONE}} explicitly, {{RunningJobsRegistry}} is necessary. However, we can 
involve {{SubmittedJobGraphStore}} when {{Dispatcher}} decided when and how a 
job get executed.

The underlaying cause of this JIRA is how {{Dispatcher}} schedules jobs. With 
discussion above, I'd like to restatement how {{Dispatcher}} **should** 
schedule jobs with the help of {{SubmittedJobsGraphStore}} and 
{{RunningJobsRegistry}}, as well as that {{JobManager}} can be decoupled with 
{{RunningJobsRegistry}}.

 1. When a job submitted for the first time, i.e., store has no graph of it and 
registry status remains {{NONE}}, the {{Dispatcher}} should in order

 # Persist the job graph in job graph store
 # Set registry status of the job to {{RUNNING}}
 # launch a {{JobManagerRunner}} and tell it to start a {{JobMaster}} on 
granted leadership. (decouple jm with registry)

       (Either 1 or 2 fails would be consider as a failed submission and 
registry entry remains {{NONE}})

2. When a job submitted with {{Dispatcher}} found that its registry status not 
{{NONE}}, reject the submission. (statement 4)

3. Whenever a {{Dispatcher}} gains leadership, it would list entries of jobs 
registry, try to recover from job store with job id that has status 
{{RUNNING}}. (With master failover, {{Dispatcher}} would tell the jm to 
reconcile the job instead of direct schedule.)

4. When a job reaches a terminal state, the Dispatcher sets the state to 
{{DONE}}.

Statements above are similar with 7 statements of [~till.rohrmann], but clarify 
how {{Dispatcher}} launch a job and emphasize we can decouple jm with registry 
because how a job get executed can be decided by {{Dispatcher}} instead of jm.

Some comments for master failover(where {{RUNNING}} comes into use)
1. A {{Dispatcher}} recovered a job with registry status {{RUNNING}} should be 
launch with a jm first try to reconcile instead of directly schedule.
2. Further {{Dispatcher}} might establish heartbeat monitor with job managers 
to detect master failover.



As [~zhuzh] there's need to unify the SubmittedJobGraphStore for session mode 
and job mode. Follow our discussions, a cluster(job or session) follows the 
process that 1. start a dispatcher 2. submit job 3. executed job 4. finish job. 
If job cluster can follow this pattern, i.e., first start a dispatcher and 
**submit** the job graph to the dispatcher. We need not a specify 
{{SingleJobSubmittedJobGraphStore}} but all 
{{highAvailabilityServices#getSubmittedJobGraphStore}}.

This is quite out of the scope here and I think efforts should be taken more on 
{{CliFrontend}}.


was (Author: tison):
Thanks for you clarification and explanation [~till.rohrmann]. I like the idea 
that bound the entries of {{RunningJobsRegistry}} to the lifecycle of cluster.

For the scope of this JIRA we can say the solution could be simply keep DONE 
and never cleanup it until the end of cluster's lifecycle.

Further, I'd like to share my thoughts on, ideally, how {{Dispatcher}} 
scheduled jobs with the help of {{SubmittedJobsGraphStore}} and 
{{RunningJobsRegistry}}.



For the states of {{JobSchedulingStatus}}, since we always need to represent 
{{DONE}} explicitly, {{RunningJobsRegistry}} is necessary. However, we can 
involve {{SubmittedJobGraphStore}} when {{Dispatcher}} decided when and how a 
job get executed.

The underlaying cause of this JIRA is how {{Dispatcher}} schedules jobs. With 
discussion above, I'd like to restatement how {{Dispatcher}} **should** 
schedule jobs with the help of {{SubmittedJobsGraphStore}} and 
{{RunningJobsRegistry}}, as well as that {{JobManager}} can be decoupled with 
{{RunningJobsRegistry}}.

 1. When a job submitted for the first time, i.e., store has no graph of it and 
registry status remains {{NONE}}, the {{Dispatcher}} should in order

 # Persist the job graph in job graph store
 # Set registry status of the job to {{RUNNING}}
 # launch a {{JobManagerRunner}} and tell it to start a {{JobMaster}} on 
granted leadership. (decouple jm with registry)

       

[jira] [Commented] (FLINK-11813) Standby per job mode Dispatchers don't know job's JobSchedulingStatus

2019-04-12 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816558#comment-16816558
 ] 

TisonKun commented on FLINK-11813:
--

Thanks for you clarification and explanation [~till.rohrmann]. I like the idea 
that bound the entries of {{RunningJobsRegistry}} to the lifecycle of cluster.

For the scope of this JIRA we can say the solution could be simply keep DONE 
and never cleanup it until the end of cluster's lifecycle.

Further, I'd like to share my thoughts on, ideally, how {{Dispatcher}} 
scheduled jobs with the help of {{SubmittedJobsGraphStore}} and 
{{RunningJobsRegistry}}.



For the states of {{JobSchedulingStatus}}, since we always need to represent 
{{DONE}} explicitly, {{RunningJobsRegistry}} is necessary. However, we can 
involve {{SubmittedJobGraphStore}} when {{Dispatcher}} decided when and how a 
job get executed.

The underlaying cause of this JIRA is how {{Dispatcher}} schedules jobs. With 
discussion above, I'd like to restatement how {{Dispatcher}} **should** 
schedule jobs with the help of {{SubmittedJobsGraphStore}} and 
{{RunningJobsRegistry}}, as well as that {{JobManager}} can be decoupled with 
{{RunningJobsRegistry}}.

 1. When a job submitted for the first time, i.e., store has no graph of it and 
registry status remains {{NONE}}, the {{Dispatcher}} should in order

 # Persist the job graph in job graph store
 # Set registry status of the job to {{RUNNING}}
 # launch a {{JobManagerRunner}} and tell it to start a {{JobMaster}} on 
granted leadership. (decouple jm with registry)

       (Either 1 or 2 fails would be consider as a failed submission and 
registry entry remains {{NONE}})

2. When a job submitted with {{Dispatcher}} found that its registry status not 
{{NONE}}, reject the submission. (statement 4)

3. Whenever a {{Dispatcher}} gains leadership, it would list entries of jobs 
registry, try to recover from job store with job id that has status 
{{RUNNING}}. (With master failover, {{Dispatcher}} would tell the jm to 
reconcile the job instead of direct schedule.)

4. When a job reaches a terminal state, the Dispatcher sets the state to 
{{DONE}}.

Statements above are similar with 7 statements of [~till.rohrmann], but clarify 
how {{Dispatcher}} launch a job and emphasize we can decouple jm with registry 
because how a job get executed can be decided by {{Dispatcher}} instead of jm.

Some comments for master failover(where {{RUNNING}} comes into use)
1. A {{Dispatcher}} recovered a job with registry status {{RUNNING}} should be 
launch with a jm first try to reconcile instead of directly schedule.
2. Further {{Dispatcher}} might establish heartbeat mechanism with job managers 
to detect master failover.



As [~zhuzh] there's need to unify the SubmittedJobGraphStore for session mode 
and job mode. Follow our discussions, a cluster(job or session) follows the 
process that 1. start a dispatcher 2. submit job 3. executed job 4. finish job. 
If job cluster can follow this pattern, i.e., first start a dispatcher and 
**submit** the job graph to the dispatcher. We need not a specify 
{{SingleJobSubmittedJobGraphStore}} but all 
{{highAvailabilityServices#getSubmittedJobGraphStore}}.

This is quite out of the scope here and I think efforts should be taken more on 
{{CliFrontend}}.

> Standby per job mode Dispatchers don't know job's JobSchedulingStatus
> -
>
> Key: FLINK-11813
> URL: https://issues.apache.org/jira/browse/FLINK-11813
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Priority: Major
>
> At the moment, it can happen that standby {{Dispatchers}} in per job mode 
> will restart a terminated job after they gained leadership. The problem is 
> that we currently clear the {{RunningJobsRegistry}} once a job has reached a 
> globally terminal state. After the leading {{Dispatcher}} terminates, a 
> standby {{Dispatcher}} will gain leadership. Without having the information 
> from the {{RunningJobsRegistry}} it cannot tell whether the job has been 
> executed or whether the {{Dispatcher}} needs to re-execute the job. At the 
> moment, the {{Dispatcher}} will assume that there was a fault and hence 
> re-execute the job. This can lead to duplicate results.
> I think we need some way to tell standby {{Dispatchers}} that a certain job 
> has been successfully executed. One trivial solution could be to not clean up 
> the {{RunningJobsRegistry}} but then we will clutter ZooKeeper.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] dawidwys merged pull request #8153: Update metrics.md

2019-04-12 Thread GitBox
dawidwys merged pull request #8153: Update metrics.md
URL: https://github.com/apache/flink/pull/8153
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12174) Introduce FlinkAggregateExtractProjectRule and remove extractFieldReferences

2019-04-12 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-12174:
---

 Summary: Introduce FlinkAggregateExtractProjectRule and remove 
extractFieldReferences
 Key: FLINK-12174
 URL: https://issues.apache.org/jira/browse/FLINK-12174
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Hequn Cheng
Assignee: Hequn Cheng


Currently, when parsing Table API expression, an inner project will be added to 
projects fields for {{Aggregate}}s. As the code show below:
{code:java}
if (!extracted.getAggregations.isEmpty) {
  val projectFields = extractFieldReferences(expressionsWithResolvedCalls)

  wrap(
operationTreeBuilder.project(extracted.getProjections,
  operationTreeBuilder.aggregate(emptyList[Expression], 
extracted.getAggregations,
operationTreeBuilder.project(projectFields, operationTree)
  )
)
  )
}
{code}

This optimization is not very suited to added here, instead, we can use a rule 
to achieve this. The `extractFieldReferences` method can also be removed if we 
use a rule which also makes the Expression parsing logic more clear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11935) Remove DateTimeUtils pull-in and fix datetime casting problem

2019-04-12 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816445#comment-16816445
 ] 

Rong Rong commented on FLINK-11935:
---

I think [~julianhyde] is correct. actually I ran the test again and seems like 
the ymdToJulian returns the correct epoch: {{-30044998861002L}} which converts 
to {{Saturday, November 29, 1017 10:58:58.998 PM}} according to 
https://www.epochconverter.com/. 
However, the current {{DateTimeUtils}} in flink-table-planner codebase returns 
a wrong epoch: {{-30044480461002}}.

Thus I think something is not right when converting the epoch back to string 
representation. Please see my test code in this commit:
https://github.com/walterddr/flink/commit/61bc3d96f4f7fbfe70cef637bdc261f0855c2011

It need not be the tests problem, might've been part of the settings in Flink's 
datetime where it is suppose to use Julian but somehow mixed with Gregorian.

I am pretty blank in terms of this knowledge domain, so some of the observation 
might've been off, any additional thoughts or comments [~julianhyde]? 

> Remove DateTimeUtils pull-in and fix datetime casting problem
> -
>
> Key: FLINK-11935
> URL: https://issues.apache.org/jira/browse/FLINK-11935
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Rong Rong
>Assignee: vinoyang
>Priority: Major
>
> This {{DateTimeUtils}} was pulled in in FLINK-7235.
> Originally the time operation was not correctly done via the {{ymdToJulian}} 
> function before the date {{1970-01-01}} thus we need the fix. similar to 
> addressing this problem:
> {code:java}
>  Optimized :1017-12-05 22:58:58.998 
>  Expected :1017-11-29 22:58:58.998
>  Actual :1017-12-05 22:58:58.998
> {code}
>  
> However, after pulling in avatica 1.13, I found out that the optimized plans 
> of the time operations are actually correct. it is in fact the casting part 
> that creates problem:
> For example, the following:
> *{{(plus(-12000.months, cast('2017-11-29 22:58:58.998', TIMESTAMP))}}*
> result in a StringTestExpression of:
> *{{CAST(1017-11-29 22:58:58.998):VARCHAR(65536) CHARACTER SET "UTF-16LE" 
> COLLATE "ISO-8859-1$en_US$primary" NOT NULL}}*
> but the testing results are:
> {code:java}
>  Optimized :1017-11-29 22:58:58.998
>  Expected :1017-11-29 22:58:58.998
>  Actual :1017-11-23 22:58:58.998
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] beyond1920 commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-12 Thread GitBox
beyond1920 commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r274981377
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunction.java
 ##
 @@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.deduplicate;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.generated.RecordEqualiser;
+import 
org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+import static 
org.apache.flink.table.runtime.deduplicate.DeduplicateFunctionHelper.processFirstRow;
+import static 
org.apache.flink.table.runtime.deduplicate.DeduplicateFunctionHelper.processLastRow;
+
+/**
+ * This function is used to deduplicate on keys and keeps only first row or 
last row.
+ */
+public class DeduplicateFunction
+   extends KeyedProcessFunctionWithCleanupState {
+
+   private static final long serialVersionUID = 4950071982706870944L;
+
+   private final BaseRowTypeInfo rowTypeInfo;
+   private final boolean generateRetraction;
+   private final boolean keepLastRow;
+   private ValueState pkRow;
+   private GeneratedRecordEqualiser generatedEqualiser;
+   private transient RecordEqualiser equaliser;
+
+   public DeduplicateFunction(long minRetentionTime, long 
maxRetentionTime, BaseRowTypeInfo rowTypeInfo,
+   boolean generateRetraction, boolean keepLastRow, 
GeneratedRecordEqualiser generatedEqualiser) {
+   super(minRetentionTime, maxRetentionTime);
+   this.rowTypeInfo = rowTypeInfo;
+   this.generateRetraction = generateRetraction;
+   this.keepLastRow = keepLastRow;
+   this.generatedEqualiser = generatedEqualiser;
+   }
+
+   @Override
+   public void open(Configuration configure) throws Exception {
+   super.open(configure);
+   String stateName = keepLastRow ? 
"DeduplicateFunctionCleanupTime" : "DeduplicateFunctionCleanupTime";
+   initCleanupTimeState(stateName);
+   ValueStateDescriptor rowStateDesc = new 
ValueStateDescriptor("rowState", rowTypeInfo);
 
 Review comment:
   Exactly, only keepLastRow and generate retract is true, we need to store 
complete row, else store pk is ok. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator

2019-04-12 Thread GitBox
StefanRRichter commented on a change in pull request #7571: [FLINK-10724] 
Refactor failure handling in check point coordinator
URL: https://github.com/apache/flink/pull/7571#discussion_r274979853
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 ##
 @@ -405,54 +404,22 @@ public void addMasterState(MasterState state) {
// 

 
/**
-* Aborts a checkpoint because it expired (took too long).
+* Aborts a checkpoint with abort reason and cause.
 */
-   public void abortExpired() {
+   public void abort(CheckpointAbortReason reason, Throwable cause) {
try {
-   Exception cause = new Exception("Checkpoint expired 
before completing");
-   onCompletionPromise.completeExceptionally(cause);
-   reportFailedCheckpoint(cause);
+   onCompletionPromise.complete(new 
CheckpointExecuteResult(reason, cause));
 
 Review comment:
   As far as I can see `CheckpointExecuteResult` is always used in the context 
a `CompletableFuture` that already is able to represent success and exceptional 
case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator

2019-04-12 Thread GitBox
StefanRRichter commented on a change in pull request #7571: [FLINK-10724] 
Refactor failure handling in check point coordinator
URL: https://github.com/apache/flink/pull/7571#discussion_r274979853
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 ##
 @@ -405,54 +404,22 @@ public void addMasterState(MasterState state) {
// 

 
/**
-* Aborts a checkpoint because it expired (took too long).
+* Aborts a checkpoint with abort reason and cause.
 */
-   public void abortExpired() {
+   public void abort(CheckpointAbortReason reason, Throwable cause) {
try {
-   Exception cause = new Exception("Checkpoint expired 
before completing");
-   onCompletionPromise.completeExceptionally(cause);
-   reportFailedCheckpoint(cause);
+   onCompletionPromise.complete(new 
CheckpointExecuteResult(reason, cause));
 
 Review comment:
   As far as I can see `CheckpointExecutionException` is always used in the 
context a `CompletableFuture` that already is able to represent success and 
exceptional case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator

2019-04-12 Thread GitBox
StefanRRichter commented on a change in pull request #7571: [FLINK-10724] 
Refactor failure handling in check point coordinator
URL: https://github.com/apache/flink/pull/7571#discussion_r274979364
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 ##
 @@ -405,54 +404,22 @@ public void addMasterState(MasterState state) {
// 

 
/**
-* Aborts a checkpoint because it expired (took too long).
+* Aborts a checkpoint with abort reason and cause.
 */
-   public void abortExpired() {
+   public void abort(CheckpointAbortReason reason, Throwable cause) {
try {
-   Exception cause = new Exception("Checkpoint expired 
before completing");
-   onCompletionPromise.completeExceptionally(cause);
-   reportFailedCheckpoint(cause);
+   onCompletionPromise.complete(new 
CheckpointExecuteResult(reason, cause));
 
 Review comment:
   I found this a bit incosistent: why having a class that represents both, 
successful result and failure, when we could just `completeExceptionally` with 
a new type of maybe `CheckpointExecutionException` that carries the `reason` 
field. It seems that the client to `onCompletionPromise` has to handle 
exceptions anyways, see line 291 `onCompletionPromise.completeExceptionally(t);`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator

2019-04-12 Thread GitBox
StefanRRichter commented on a change in pull request #7571: [FLINK-10724] 
Refactor failure handling in check point coordinator
URL: https://github.com/apache/flink/pull/7571#discussion_r274976664
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointExecuteResult.java
 ##
 @@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * The result of executing a checkpoint.
+ */
+public class CheckpointExecuteResult {
+
+   private final CompletedCheckpoint completedCheckpoint;
+
+   private final CheckpointAbortReason failure;
+
+   private final Throwable failureCause;
+
+   CheckpointExecuteResult(CompletedCheckpoint success) {
+   this.completedCheckpoint = success;
+   this.failure = null;
+   this.failureCause = null;
+   }
+
+   CheckpointExecuteResult(CheckpointAbortReason failure, Throwable 
failureCause) {
+   this.failure = failure;
+   this.failureCause = failureCause;
+   this.completedCheckpoint = null;
+   }
+
+   public boolean isSuccess() {
+   return completedCheckpoint != null;
+   }
+
+   public boolean isFailure() {
+   return failure != null;
+   }
+
+   public CompletedCheckpoint getCompletedCheckpoint() {
+   if (failure != null) {
+   throw new IllegalStateException("Checkpoint executing 
was failure"
+   + (failureCause != null ? 
failureCause.getMessage() : "") + ".");
+   }
+   return completedCheckpoint;
+   }
+
+   public CheckpointAbortReason getFailureReason() {
 
 Review comment:
   can be private


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator

2019-04-12 Thread GitBox
StefanRRichter commented on a change in pull request #7571: [FLINK-10724] 
Refactor failure handling in check point coordinator
URL: https://github.com/apache/flink/pull/7571#discussion_r274975985
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointAbortReason.java
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * Various reasons why a checkpoint was aborted.
+ */
+public enum CheckpointAbortReason {
+
+   CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
+
+   CHECKPOINT_SUBSUMED("Checkpoint has been subsumed."),
+
+   CHECKPOINT_DECLINED("Checkpoint was declined (tasks not ready)."),
+
+   CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
+
+   CHECKPOINT_COORDINATOR_SUSPEND("Checkpoint Coordinator is suspending."),
+
+   Job_FAILURE("The job has failed."),
 
 Review comment:
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator

2019-04-12 Thread GitBox
StefanRRichter commented on a change in pull request #7571: [FLINK-10724] 
Refactor failure handling in check point coordinator
URL: https://github.com/apache/flink/pull/7571#discussion_r274975029
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointExecuteResult.java
 ##
 @@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * The result of executing a checkpoint.
+ */
+public class CheckpointExecuteResult {
 
 Review comment:
   `CheckpointExecuteResult` -> `CheckpointExecutionResult`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator

2019-04-12 Thread GitBox
StefanRRichter commented on a change in pull request #7571: [FLINK-10724] 
Refactor failure handling in check point coordinator
URL: https://github.com/apache/flink/pull/7571#discussion_r274974832
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -666,10 +671,11 @@ else if (!props.forceCheckpoint()) {
 * Receives a {@link DeclineCheckpoint} message for a pending 
checkpoint.
 *
 * @param message Checkpoint decline from the task manager
+* @return true if should fail the job
 */
-   public void receiveDeclineMessage(DeclineCheckpoint message) {
+   public boolean receiveDeclineMessage(DeclineCheckpoint message) {
 
 Review comment:
   This level of abstraction feels strange to me, that the return value from 
the decline method of the checkpoint coordinator should decide of the job 
fails. Conceptually, shouldn't that rather be an optional reaction of the job 
master to a failed `CheckpointExecuteResult`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator

2019-04-12 Thread GitBox
StefanRRichter commented on a change in pull request #7571: [FLINK-10724] 
Refactor failure handling in check point coordinator
URL: https://github.com/apache/flink/pull/7571#discussion_r274967719
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -954,6 +968,7 @@ public void heartbeatFromResourceManager(final ResourceID 
resourceID) {
}
return checkpointCoordinator
.triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
+   
.thenApply(CheckpointExecuteResult::getCompletedCheckpoint)
 
 Review comment:
   If there was a savepoint failure, this call on the `CheckpointExecuteResult` 
will trigger an illegal state exception and even lose the trace of the original 
exception. I dont think that is what we want here, but handling two different 
paths?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator

2019-04-12 Thread GitBox
StefanRRichter commented on a change in pull request #7571: [FLINK-10724] 
Refactor failure handling in check point coordinator
URL: https://github.com/apache/flink/pull/7571#discussion_r274966185
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointExecuteResult.java
 ##
 @@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * The result of executing a checkpoint.
+ */
+public class CheckpointExecuteResult {
+
+   private final CompletedCheckpoint completedCheckpoint;
+
+   private final CheckpointAbortReason failure;
+
+   private final Throwable failureCause;
+
+   CheckpointExecuteResult(CompletedCheckpoint success) {
+   this.completedCheckpoint = success;
 
 Review comment:
   Instead of code-duplicating the assignments in both constructors, I suggest 
one private constructior that receives all 3 parameters, and the existing two 
will delegate to the new private constructor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator

2019-04-12 Thread GitBox
StefanRRichter commented on a change in pull request #7571: [FLINK-10724] 
Refactor failure handling in check point coordinator
URL: https://github.com/apache/flink/pull/7571#discussion_r274965744
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointExecuteResult.java
 ##
 @@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * The result of executing a checkpoint.
+ */
+public class CheckpointExecuteResult {
+
+   private final CompletedCheckpoint completedCheckpoint;
+
+   private final CheckpointAbortReason failure;
+
+   private final Throwable failureCause;
+
+   CheckpointExecuteResult(CompletedCheckpoint success) {
+   this.completedCheckpoint = success;
+   this.failure = null;
+   this.failureCause = null;
+   }
+
+   CheckpointExecuteResult(CheckpointAbortReason failure, Throwable 
failureCause) {
+   this.failure = failure;
+   this.failureCause = failureCause;
+   this.completedCheckpoint = null;
+   }
+
+   public boolean isSuccess() {
+   return completedCheckpoint != null;
+   }
+
+   public boolean isFailure() {
+   return failure != null;
+   }
+
+   public CompletedCheckpoint getCompletedCheckpoint() {
+   if (failure != null) {
+   throw new IllegalStateException("Checkpoint executing 
was failure"
+   + (failureCause != null ? 
failureCause.getMessage() : "") + ".");
+   }
+   return completedCheckpoint;
+   }
+
+   public CheckpointAbortReason getFailureReason() {
+   if (failure != null) {
+   return failure;
+   } else {
+   throw new IllegalStateException("Checkpoint executing 
was successful");
+   }
+   }
+
+   public Throwable getFailureCause() {
+   if (getFailureReason() != null) {
+   return failureCause;
+   } else {
+   throw new IllegalStateException("Checkpoint executing 
was successful");
+   }
+   }
+
+   @Override
+   public String toString() {
+   return "CheckpointExecuteResult(" +
+   (isSuccess() ?
+   ("completedCheckpoint: " + completedCheckpoint) 
:
+   ("failure: " + failureCause == null ? "" : 
failureCause.getMessage())
 
 Review comment:
   I think this requires parentheses to be correct: `(failureCause == null ? "" 
: failureCause.getMessage()))`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections

2019-04-12 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816377#comment-16816377
 ] 

Till Rohrmann commented on FLINK-10052:
---

Looking at the provided link, it should be fine to upgrade our Curator 
dependency to {{4.x}} [~xiaogang.shi]. Ideally, we do this early in the release 
cycle to give the change more exposure.

> Tolerate temporarily suspended ZooKeeper connections
> 
>
> Key: FLINK-10052
> URL: https://issues.apache.org/jira/browse/FLINK-10052
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Dominik Wosiński
>Priority: Major
>
> This issue results from FLINK-10011 which uncovered a problem with Flink's HA 
> recovery and proposed the following solution to harden Flink:
> The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator 
> recipe for leader election. The leader latch revokes leadership in case of a 
> suspended ZooKeeper connection. This can be premature in case that the system 
> can reconnect to ZooKeeper before its session expires. The effect of the lost 
> leadership is that all jobs will be canceled and directly restarted after 
> regaining the leadership.
> Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper 
> connection, it would be better to wait until the ZooKeeper connection is 
> LOST. That way we would allow the system to reconnect and not lose the 
> leadership. This could be achievable by using Curator's {{LeaderSelector}} 
> instead of the {{LeaderLatch}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] StefanRRichter commented on a change in pull request #7571: [FLINK-10724] Refactor failure handling in check point coordinator

2019-04-12 Thread GitBox
StefanRRichter commented on a change in pull request #7571: [FLINK-10724] 
Refactor failure handling in check point coordinator
URL: https://github.com/apache/flink/pull/7571#discussion_r274962877
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -699,13 +701,25 @@ public void declineCheckpoint(DeclineCheckpoint decline) 
{
final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
 
if (checkpointCoordinator != null) {
-   getRpcService().execute(() -> {
+   CompletableFuture shouldFailJobFuture = 
getRpcService().execute(() -> {
try {
-   
checkpointCoordinator.receiveDeclineMessage(decline);
+   return 
checkpointCoordinator.receiveDeclineMessage(decline);
} catch (Exception e) {
log.error("Error in 
CheckpointCoordinator while processing {}", decline, e);
}
+
+   return false;
});
+
+   try {
+   boolean shouldFailJob = 
shouldFailJobFuture.get(rpcTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 Review comment:
   I think this line is executed in the job master's main-thread and should 
therefore not be blocking. I think the it would be better to chain `failGlobal` 
after the `shouldFailJobFuture` completion to complete async in the job master 
main-thread.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12173) Optimize "SELECT DISTINCT" into Deduplicate with keep first row

2019-04-12 Thread Jark Wu (JIRA)
Jark Wu created FLINK-12173:
---

 Summary: Optimize "SELECT DISTINCT" into Deduplicate with keep 
first row
 Key: FLINK-12173
 URL: https://issues.apache.org/jira/browse/FLINK-12173
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: Jark Wu


The following distinct query can be optimized into deduplicate on keys "a, b, 
c, d" and keep the first row.

{code:sql}
SELECT DISTINCT a, b, c, d;
{code}

We can optimize this query into Deduplicate to get a better performance than 
GroupAggregate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11750) Replace IntermediateResultPartitionID with ResultPartitionID in ResultPartitionDeploymentDescriptor

2019-04-12 Thread Andrey Zagrebin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Zagrebin closed FLINK-11750.
---
  Resolution: Won't Fix
Release Note: 
see
https://github.com/apache/flink/pull/7835#issuecomment-480865772

> Replace IntermediateResultPartitionID with ResultPartitionID in 
> ResultPartitionDeploymentDescriptor
> ---
>
> Key: FLINK-11750
> URL: https://issues.apache.org/jira/browse/FLINK-11750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The motivation of task is for preparing the creation of {{ResultPartition}} 
> via {{ShuffleService}} future.
> Currently during the creation of {{ResultPartition}} we need the 
> {{ExecutionAttemptID}} info to generate {{ResultPartitionID}}. To make the 
> interface of {{ShuffleService#createResultPartitionWriter}} clean, it is 
> better to get {{ResultPartitionID}} directly from 
> {{ResultPartitionDeploymentDescriptor}} instead of explicitly passing 
> additional {{ExecutionAttemptID}}.
> We could create this {{ResultPartitionID}} during generating 
> {{ResultPartitionDeploymentDescriptor}} to replace the original field of 
> {{IntermediateResultPartitionID}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] azagrebin commented on a change in pull request #8133: [FLINK-12146][network] Remove unregister task from NetworkEnvironment to simplify the interface of ShuffleService

2019-04-12 Thread GitBox
azagrebin commented on a change in pull request #8133: [FLINK-12146][network] 
Remove unregister task from NetworkEnvironment to simplify the interface of 
ShuffleService
URL: https://github.com/apache/flink/pull/8133#discussion_r274914550
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -879,6 +875,24 @@ else if (transitionState(current, ExecutionState.FAILED, 
t)) {
}
}
 
+   private void releaseNetworkResources() throws Exception {
+   LOG.debug("Release task {} network resources (state: {}).",
+   taskInfo.getTaskNameWithSubtasks(), 
getExecutionState());
+
+   for (ResultPartition partition : producedPartitions) {
+   
taskEventDispatcher.unregisterPartition(partition.getPartitionId());
+   partition.close();
 
 Review comment:
   I guess:
   ```
   if (isCanceledOrFailed()) {
 partition.fail(getFailureCause());
   } else {
 partition.close();
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8133: [FLINK-12146][network] Remove unregister task from NetworkEnvironment to simplify the interface of ShuffleService

2019-04-12 Thread GitBox
azagrebin commented on a change in pull request #8133: [FLINK-12146][network] 
Remove unregister task from NetworkEnvironment to simplify the interface of 
ShuffleService
URL: https://github.com/apache/flink/pull/8133#discussion_r274957657
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -879,6 +875,24 @@ else if (transitionState(current, ExecutionState.FAILED, 
t)) {
}
}
 
+   private void releaseNetworkResources() throws Exception {
+   LOG.debug("Release task {} network resources (state: {}).",
+   taskInfo.getTaskNameWithSubtasks(), 
getExecutionState());
+
+   for (ResultPartition partition : producedPartitions) {
+   
taskEventDispatcher.unregisterPartition(partition.getPartitionId());
+   partition.close();
 
 Review comment:
   but I think it would be ok, if we actually add one more method 
`closeNetworkResources` and put there partition/gate closings from 
`TaskCanceler.run` and use here in `releaseNetworkResources` after 
`taskEventDispatcher.unregisterPartition` and `if () partition.fail` loops. We 
will eliminate code duplication and improve log/exception handling in former 
`unregisterTask`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8133: [FLINK-12146][network] Remove unregister task from NetworkEnvironment to simplify the interface of ShuffleService

2019-04-12 Thread GitBox
azagrebin commented on a change in pull request #8133: [FLINK-12146][network] 
Remove unregister task from NetworkEnvironment to simplify the interface of 
ShuffleService
URL: https://github.com/apache/flink/pull/8133#discussion_r273917521
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -326,6 +326,23 @@ public void release(Throwable cause) {
 
@Override
public void close() {
+   close(null);
+   }
+
+   /**
+* There are two scenarios to close the result partition.
+*
+* For FINISHED task, we only destroy buffer pool to free available 
segments.
+*
+* For FAILED or CANCELING task, we should also release the result 
partition.
+*
+* @param throwable which indicates FAILED/CANCELING task if not null
+*/
+   public void close(Throwable throwable) {
 
 Review comment:
   I am wondering if we should actually call this method `fail(cause)` and make 
it:
   ```
   public void close() {
if (bufferPool != null) {
bufferPool.lazyDestroy();
}
   }
   
   public void fail(@Nonnull Throwable throwable) {

partitionManager.releasePartitionsProducedBy(partitionId.getProducerId(), 
throwable);
close();
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-12 Thread GitBox
wuchong commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r274952010
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunction.java
 ##
 @@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.deduplicate;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.generated.RecordEqualiser;
+import 
org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+import static 
org.apache.flink.table.runtime.deduplicate.DeduplicateFunctionHelper.processFirstRow;
+import static 
org.apache.flink.table.runtime.deduplicate.DeduplicateFunctionHelper.processLastRow;
+
+/**
+ * This function is used to deduplicate on keys and keeps only first row or 
last row.
+ */
+public class DeduplicateFunction
+   extends KeyedProcessFunctionWithCleanupState {
+
+   private static final long serialVersionUID = 4950071982706870944L;
+
+   private final BaseRowTypeInfo rowTypeInfo;
+   private final boolean generateRetraction;
+   private final boolean keepLastRow;
+   private ValueState pkRow;
+   private GeneratedRecordEqualiser generatedEqualiser;
+   private transient RecordEqualiser equaliser;
+
+   public DeduplicateFunction(long minRetentionTime, long 
maxRetentionTime, BaseRowTypeInfo rowTypeInfo,
+   boolean generateRetraction, boolean keepLastRow, 
GeneratedRecordEqualiser generatedEqualiser) {
+   super(minRetentionTime, maxRetentionTime);
+   this.rowTypeInfo = rowTypeInfo;
+   this.generateRetraction = generateRetraction;
+   this.keepLastRow = keepLastRow;
+   this.generatedEqualiser = generatedEqualiser;
+   }
+
+   @Override
+   public void open(Configuration configure) throws Exception {
+   super.open(configure);
+   String stateName = keepLastRow ? 
"DeduplicateFunctionCleanupTime" : "DeduplicateFunctionCleanupTime";
+   initCleanupTimeState(stateName);
+   ValueStateDescriptor rowStateDesc = new 
ValueStateDescriptor("rowState", rowTypeInfo);
 
 Review comment:
   Yes. I think we should only store the PK in state here. 
   
   If we only store PK, the ideal state schema should be `ValueState`, 
but this can't share the same state with LastRow mode. Maybe we need to 
separate the implementation for firstRow and lastRow.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-12 Thread GitBox
wuchong commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r274953377
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecExchange.scala
 ##
 @@ -32,7 +44,10 @@ class StreamExecExchange(
 relNode: RelNode,
 relDistribution: RelDistribution)
   extends CommonPhysicalExchange(cluster, traitSet, relNode, relDistribution)
-  with StreamPhysicalRel {
+  with StreamPhysicalRel
+  with StreamExecNode[BaseRow] {
+
+  private val DEFAULT_MAX_PARALLELISM = 1 << 7
 
 Review comment:
   +1 . I'm in favor of the later one which is also used in 
[`KeyedStream.java#134` 
](https://github.com/apache/flink/blob/ddba1b69f43cbb885e178dfaafa120f1fe196a13/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java#L134)
 and has a clear javadoc description.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10929) Add support for Apache Arrow

2019-04-12 Thread Kurt Young (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816345#comment-16816345
 ] 

Kurt Young commented on FLINK-10929:


I'm not sure everyone who have already involved to this discussion have a clean 
and common goal about introducing Apache Arrow to Flink. As Stephan said, there 
are two scenarios which can be considered. 

Regarding (2): I think making Arrow as a vectorized execution data format will 
involves lots of changes, from runtime to operator and query optimizer. We 
should at first have consensus about the final goal and status of this. Whether 
streaming can benefits from vectorized execution? Will this break the 
unification of streaming and batch? How many benefits we can gain from it... 
There are lots of unanswered questions. 

> Add support for Apache Arrow
> 
>
> Key: FLINK-10929
> URL: https://issues.apache.org/jira/browse/FLINK-10929
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / State Backends
>Reporter: Pedro Cardoso Silva
>Priority: Minor
> Attachments: image-2019-04-10-13-43-08-107.png
>
>
> Investigate the possibility of adding support for Apache Arrow as a 
> standardized columnar, memory format for data.
> Given the activity that [https://github.com/apache/arrow] is currently 
> getting and its claims objective of providing a zero-copy, standardized data 
> format across platforms, I think it makes sense for Flink to look into 
> supporting it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-12 Thread GitBox
wuchong commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r274952010
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunction.java
 ##
 @@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.deduplicate;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.generated.RecordEqualiser;
+import 
org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+import static 
org.apache.flink.table.runtime.deduplicate.DeduplicateFunctionHelper.processFirstRow;
+import static 
org.apache.flink.table.runtime.deduplicate.DeduplicateFunctionHelper.processLastRow;
+
+/**
+ * This function is used to deduplicate on keys and keeps only first row or 
last row.
+ */
+public class DeduplicateFunction
+   extends KeyedProcessFunctionWithCleanupState {
+
+   private static final long serialVersionUID = 4950071982706870944L;
+
+   private final BaseRowTypeInfo rowTypeInfo;
+   private final boolean generateRetraction;
+   private final boolean keepLastRow;
+   private ValueState pkRow;
+   private GeneratedRecordEqualiser generatedEqualiser;
+   private transient RecordEqualiser equaliser;
+
+   public DeduplicateFunction(long minRetentionTime, long 
maxRetentionTime, BaseRowTypeInfo rowTypeInfo,
+   boolean generateRetraction, boolean keepLastRow, 
GeneratedRecordEqualiser generatedEqualiser) {
+   super(minRetentionTime, maxRetentionTime);
+   this.rowTypeInfo = rowTypeInfo;
+   this.generateRetraction = generateRetraction;
+   this.keepLastRow = keepLastRow;
+   this.generatedEqualiser = generatedEqualiser;
+   }
+
+   @Override
+   public void open(Configuration configure) throws Exception {
+   super.open(configure);
+   String stateName = keepLastRow ? 
"DeduplicateFunctionCleanupTime" : "DeduplicateFunctionCleanupTime";
+   initCleanupTimeState(stateName);
+   ValueStateDescriptor rowStateDesc = new 
ValueStateDescriptor("rowState", rowTypeInfo);
 
 Review comment:
   Yes. I think we should only store the PK in state here. 
   
   If we only store PK, the ideal state schema should be `ValueState`, 
but this can't share the same state with LastRow mode. Maybe we need to 
separate the implementation for firstRow and lastRow.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] beyond1920 commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-12 Thread GitBox
beyond1920 commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r274949277
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecExchange.scala
 ##
 @@ -32,7 +44,10 @@ class StreamExecExchange(
 relNode: RelNode,
 relDistribution: RelDistribution)
   extends CommonPhysicalExchange(cluster, traitSet, relNode, relDistribution)
-  with StreamPhysicalRel {
+  with StreamPhysicalRel
+  with StreamExecNode[BaseRow] {
+
+  private val DEFAULT_MAX_PARALLELISM = 1 << 7
 
 Review comment:
   The variable's name is confusing, it's not max parallelism of operators, but 
max number of key-groups. Maybe it's better to use 
`StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM` or 
`KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12169) Improve Javadoc of MessageAcknowledgingSourceBase

2019-04-12 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-12169.
---
   Resolution: Fixed
Fix Version/s: 1.8.1
   1.9.0
   1.7.3

Fixed in master: 4c1311e586822f75c8e0bf31e3a7adc274456d0f
Fixed in release-1.8: b532b6fb3d5579d96f631474b00c8d2fb1f34e39
Fixed in release-1.7: c1ca910c62f97e06fa76ef548f4084277c483905

> Improve Javadoc of MessageAcknowledgingSourceBase
> -
>
> Key: FLINK-12169
> URL: https://issues.apache.org/jira/browse/FLINK-12169
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> {{MessageAcknowledgingSourceBase}} could do a better job explaining how 
> implementing classes need to interact with it to achieve exactly-once 
> guarantees.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12172) Flink fails to close pending BucketingSink

2019-04-12 Thread Mario Georgiev (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mario Georgiev updated FLINK-12172:
---
Description: 
Hello,

The problem is if you have a BucketingSink, the following case may occur :

Let's say you have a 2019-04-12–12 bucket created with several files inside 
which are pending/finished
 You create a savepoint and shut down the job
 After an hour for instance you start the job from the savepoint and a new 
bucket is created, 2019-04-16 for instance. 
 The problem is that the .pending ones from the old buckets seem to never be 
moved to finished state if there is a new hourly bucket created.

  was:
Hello,

The problem is if you have a BucketingSink, the following case may occur :

Let's say you have a 2019-04-12–12 bucket created with several files inside 
which are pending/finished
You create a savepoint and shut down the job
After an hour for instance you start the job from the savepoint and a new 
bucket is created, 2019-04-16 for instance. 
The problem is that the .pending ones from the old buckets seem to never be 
moved to finished state. 


> Flink fails to close pending BucketingSink
> --
>
> Key: FLINK-12172
> URL: https://issues.apache.org/jira/browse/FLINK-12172
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.7.2
>Reporter:  Mario Georgiev
>Priority: Major
>
> Hello,
> The problem is if you have a BucketingSink, the following case may occur :
> Let's say you have a 2019-04-12–12 bucket created with several files inside 
> which are pending/finished
>  You create a savepoint and shut down the job
>  After an hour for instance you start the job from the savepoint and a new 
> bucket is created, 2019-04-16 for instance. 
>  The problem is that the .pending ones from the old buckets seem to never be 
> moved to finished state if there is a new hourly bucket created.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12172) Flink fails to close pending BucketingSink

2019-04-12 Thread Mario Georgiev (JIRA)
 Mario Georgiev created FLINK-12172:
---

 Summary: Flink fails to close pending BucketingSink
 Key: FLINK-12172
 URL: https://issues.apache.org/jira/browse/FLINK-12172
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.7.2
Reporter:  Mario Georgiev


Hello,

The problem is if you have a BucketingSink, the following case may occur :

Let's say you have a 2019-04-12–12 bucket created with several files inside 
which are pending/finished
You create a savepoint and shut down the job
After an hour for instance you start the job from the savepoint and a new 
bucket is created, 2019-04-16 for instance. 
The problem is that the .pending ones from the old buckets seem to never be 
moved to finished state. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8155: [FLINK-12169] [javadocs] improve javadoc of MessageAcknowledgingSource…

2019-04-12 Thread GitBox
asfgit closed pull request #8155: [FLINK-12169] [javadocs] improve javadoc of 
MessageAcknowledgingSource…
URL: https://github.com/apache/flink/pull/8155
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8155: [FLINK-12169] [javadocs] improve javadoc of MessageAcknowledgingSource…

2019-04-12 Thread GitBox
sunjincheng121 commented on issue #8155: [FLINK-12169] [javadocs] improve 
javadoc of MessageAcknowledgingSource…
URL: https://github.com/apache/flink/pull/8155#issuecomment-482589074
 
 
   @flinkbot approve all
   Merging...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zjffdu commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-12 Thread GitBox
zjffdu commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r274920821
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinder.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.PathMatcher;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.stream.Collectors;
+
+/**
+ * This class is used to create a collection of {@link PluginDescriptor} based 
on directory structure for a given plugin
+ * root folder.
+ *
+ * The expected structure is as follows: the given plugins root folder, 
containing the plugins folder. One plugin folder
+ * contains all resources (jar files) belonging to a plugin. The name of the 
plugin folder becomes the plugin id.
+ * 
+ * plugins-root-folder/
+ *|plugin-a/ (folder of plugin a)
+ *||-plugin-a-1.jar (the jars containing the 
classes of plugin a)
+ *||-plugin-a-2.jar
+ *||-...
+ *|
+ *|plugin-b/
+ *||-plugin-b-1.jar
+ *   ...   |-...
+ * 
 
 Review comment:
   For yarn mode, we should ship the plugins to yarn container. I don't think 
this PR do that, but it could be done in another PR. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8159: [hotfix][runtime] fix error log description

2019-04-12 Thread GitBox
flinkbot commented on issue #8159: [hotfix][runtime] fix error log description
URL: https://github.com/apache/flink/pull/8159#issuecomment-482587198
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leesf opened a new pull request #8159: [hotfix][runtime] fix error log description

2019-04-12 Thread GitBox
leesf opened a new pull request #8159: [hotfix][runtime] fix error log 
description
URL: https://github.com/apache/flink/pull/8159
 
 
   [hotfix][runtime] fix error log description.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface

2019-04-12 Thread GitBox
pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement 
the runtime handling of the InputSelectable interface
URL: https://github.com/apache/flink/pull/8124#discussion_r274834483
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
 ##
 @@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Input reader for {@link 
org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask}
+ * in the case that the operator is InputSelectable.
+ *
+ * @param  The type of the records that arrive on the first input
+ * @param  The type of the records that arrive on the second input
+ */
+@Internal
+public class StreamTwoInputSelectableProcessor {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StreamTwoInputSelectableProcessor.class);
+
+   private volatile boolean continuousProcessing = true;
+
+   private final NetworkInput input1;
+   private final NetworkInput input2;
+
+   private final Object lock;
+
+   private final TwoInputStreamOperator streamOperator;
+
+   private final InputSelectable inputSelector;
+
+   private final AuxiliaryHandler auxiliaryHandler;
+
+   private final CompletableFuture[] listenFutures;
+
+   private final boolean[] isFinished;
+
+   private InputSelection inputSelection;
+
+   private AtomicInteger availableInputsMask = new AtomicInteger();
+
+   private int lastReadingInputMask;
 
 Review comment:
   this could be a local variable


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface

2019-04-12 Thread GitBox
pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement 
the runtime handling of the InputSelectable interface
URL: https://github.com/apache/flink/pull/8124#discussion_r274820408
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ##
 @@ -197,22 +217,23 @@ public void requestPartitions() throws IOException, 
InterruptedException {
return Optional.of(bufferOrEvent);
}
 
-   @Override
-   public Optional pollNextBufferOrEvent() throws 
UnsupportedOperationException {
-   throw new UnsupportedOperationException();
-   }
-
-   private InputGateWithData waitAndGetNextInputGate() throws IOException, 
InterruptedException {
+   private InputGateWithData waitAndGetNextInputGate(boolean blocking) 
throws IOException, InterruptedException {
 
 Review comment:
   I doubt there would be a measurable difference. You can use existing network 
stack benchmarks to check for that. When I was introducing `Optional` in 
`getNextBufferOrEvent`, there was none. Furthermore since `getNextBufferOrEvent 
` is already using and constructing `Optional`, I'm pretty sure that `Optional` 
in `waitAndGetNextInputGate` would JIT inline to a no-op.
   
   But sure, if you can show that `@Nullable` is faster, then +1 from my side 
for it. Otherwise `Optional` is safer than `@Nullable`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface

2019-04-12 Thread GitBox
pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement 
the runtime handling of the InputSelectable interface
URL: https://github.com/apache/flink/pull/8124#discussion_r274868359
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
 ##
 @@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Input reader for {@link 
org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask}
+ * in the case that the operator is InputSelectable.
+ *
+ * @param  The type of the records that arrive on the first input
+ * @param  The type of the records that arrive on the second input
+ */
+@Internal
+public class StreamTwoInputSelectableProcessor {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StreamTwoInputSelectableProcessor.class);
+
+   private volatile boolean continuousProcessing = true;
+
+   private final NetworkInput input1;
+   private final NetworkInput input2;
+
+   private final Object lock;
+
+   private final TwoInputStreamOperator streamOperator;
+
+   private final InputSelectable inputSelector;
+
+   private final AuxiliaryHandler auxiliaryHandler;
+
+   private final CompletableFuture[] listenFutures;
+
+   private final boolean[] isFinished;
+
+   private InputSelection inputSelection;
+
+   private AtomicInteger availableInputsMask = new AtomicInteger();
+
+   private int lastReadingInputMask;
+
+   private static final int TWO_INPUT_ANY_MASK = (int) new 
InputSelection.Builder()
+   .select(1)
+   .select(2)
+   .build()
+   .getInputMask();
+
+   private static final int INPUT1_ID = 1;
+   private static final int INPUT2_ID = 2;
+
+   //  Metrics --
+
+   private final WatermarkGauge input1WatermarkGauge;
+   private final WatermarkGauge input2WatermarkGauge;
+
+   private Counter numRecordsIn;
+
+   public StreamTwoInputSelectableProcessor(
+   Collection inputGates1,
+   Collection inputGates2,
+   TypeSerializer inputSerializer1,
+   TypeSerializer inputSerializer2,
+   Object lock,
+   IOManager ioManager,
+   StreamStatusMaintainer streamStatusMaintainer,
+   TwoInputStreamOperator streamOperator,
+   WatermarkGauge input1WatermarkGauge,
+   WatermarkGauge input2WatermarkGauge) {
+
+   checkState(streamOperator instanceof 

[GitHub] [flink] pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface

2019-04-12 Thread GitBox
pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement 
the runtime handling of the InputSelectable interface
URL: https://github.com/apache/flink/pull/8124#discussion_r274511889
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
 ##
 @@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Input reader for {@link 
org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask}
+ * in the case that the operator is InputSelectable.
+ *
+ * @param  The type of the records that arrive on the first input
+ * @param  The type of the records that arrive on the second input
+ */
+@Internal
+public class StreamTwoInputSelectableProcessor {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StreamTwoInputSelectableProcessor.class);
+
+   private volatile boolean continuousProcessing = true;
 
 Review comment:
   I think you are not using this `volatile boolean` anywhere, since 
`endIteration` will interrupt `(inputProcessor.isContinuousProcessing() && 
inputProcessor.processInput()) ` condition in 
`StreamTwoInputSelectableProcessorThread`, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface

2019-04-12 Thread GitBox
pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement 
the runtime handling of the InputSelectable interface
URL: https://github.com/apache/flink/pull/8124#discussion_r274512975
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamTaskNonSelectableInputThroughputBenchmark.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import 
org.apache.flink.runtime.io.network.partition.consumer.IterableInputChannel;
+import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
+import 
org.apache.flink.streaming.runtime.io.benchmark.StreamTaskInputBenchmarkEnvironment.ProcessorAndChannels;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Task-input (non-selectable) throughput benchmarks executed by the external
+ * https://github.com/dataArtisans/flink-benchmarks;>flink-benchmarks 
project.
+ */
+public class StreamTaskNonSelectableInputThroughputBenchmark extends 
StreamTaskInputThroughputBenchmarkBase {
+
+   public void setUp(int numInputGates, int numChannelsPerGate, long 
numRecordsPerChannel) throws Exception {
+   setUp(
+   numInputGates, numInputGates,
+   numChannelsPerGate, numChannelsPerGate,
+   numRecordsPerChannel, numRecordsPerChannel,
+   new SummingLongStreamOperator());
+   }
+
+   @Override
+   protected AbstractTaskInputProcessorThread createProcessorThread(
+   int numInputGates1,
+   int numInputGates2,
+   int numChannels1PerGate,
+   int numChannels2PerGate,
+   long numRecords1PerChannel,
+   long numRecords2PerChannel,
+   long inputValue1,
+   long inputValue2,
+   SummingLongStreamOperator streamOperator) throws IOException {
+
+   ProcessorAndChannels, 
IterableInputChannel> processorAndChannels =
+   environment.createTwoInputProcessor(
+   numInputGates1,
+   numInputGates2,
+   numChannels1PerGate,
+   numChannels2PerGate,
+   numRecords1PerChannel,
+   numRecords2PerChannel,
+   1,
+   2,
+   streamOperator);
+
+   return new StreamTwoInputProcessorThread(
+   processorAndChannels.processor(),
+   processorAndChannels.channels(),
+   streamOperator);
+   }
+
+   // 

+   //  Utilities
+   // 

+
+   private static class StreamTwoInputProcessorThread extends 
AbstractTaskInputProcessorThread {
+
+   private final StreamTwoInputProcessor inputProcessor;
+
+   private final SummingLongStreamOperator streamOperator;
+
+   private volatile boolean continuousProcessing = true;
 
 Review comment:
   Did you introduce this `volatile boolean` in order to stop benchmarks? 
Couldn't you use `EndOfPartitionEvent` instead to do this? Basically when you 
prepare an input in the benchmarks, just emit `EndOfPartitionEvent` at the end. 
When this event gets to the receiving operator, it should interrupt processing 
loops. The only issue might be that it will be impossible to restart the input 
without recreating it, but overhead of constructing the 
`StreamTwoInputProcessor` and `inputChannels` at the start of every benchmark 
iteration should be negligible. While on the other hand, checking `volatile 
boolean` once per record might affect results.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 

[GitHub] [flink] pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface

2019-04-12 Thread GitBox
pnowojski commented on a change in pull request #8124: [FLINK-11877] Implement 
the runtime handling of the InputSelectable interface
URL: https://github.com/apache/flink/pull/8124#discussion_r274819324
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
 ##
 @@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collection;
+
+/**
+ * A {@link StreamTask} for executing the {@link TwoInputStreamOperator} which 
can select the input to
+ * get the next {@link StreamRecord}.
+ */
+@Internal
+public class TwoInputSelectableStreamTask extends 
AbstractTwoInputStreamTask {
+
+   private volatile StreamTwoInputSelectableProcessor 
inputProcessor;
 
 Review comment:
   Why previously `TwoInputStreamTask` didn't need it? Is it related to my 
other comment about `volatile boolean continuousProcessing` flag?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on issue #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-12 Thread GitBox
xuefuz commented on issue #8007: [FLINK-11474][table] Add ReadableCatalog, 
ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#issuecomment-482569613
 
 
   > Hi @sunjincheng121 , @xuefuz , @bowenli86
   > 
   > Regarding to the `listTables()`, I'm fine with current approach that 
`listTables` returns tables and views if you all prefer it. I don't have a 
strong opinion on this.
   > 
   > Regarding moving to the mailing list, my initial intention is involving 
more people into the discussion when we have diverging opinions on API, but it 
seems that we have a consensus on this now. I agree with @sunjincheng121 that 
if we think the API change is minor, I think it's fine to keep the discussion 
in PR. But I would suggest to update the FLIP according to the latest change 
after the pull request is merged.
   > 
   > So, I'm +1 to merge this after Dawid's comments addressed.
   
   Yes, I will update the FLIP to reflect what we agreed upon here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-12 Thread GitBox
xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r274899236
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
 ##
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+
+/**
+ * Represents a database object in a catalog.
+ */
+public interface CatalogDatabase {
 
 Review comment:
   > Hi @sunjincheng121 , @xuefuz , @bowenli86
   > 
   > Regarding to the `listTables()`, I'm fine with current approach that 
`listTables` returns tables and views if you all prefer it. I don't have a 
strong opinion on this.
   > 
   > Regarding moving to the mailing list, my initial intention is involving 
more people into the discussion when we have diverging opinions on API, but it 
seems that we have a consensus on this now. I agree with @sunjincheng121 that 
if we think the API change is minor, I think it's fine to keep the discussion 
in PR. But I would suggest to update the FLIP according to the latest change 
after the pull request is merged.
   > 
   > So, I'm +1 to merge this after Dawid's comments addressed.
   
   Yes, I will update the FLIP to reflect what we have agreed upon. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-12 Thread GitBox
xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r274898608
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogView.java
 ##
 @@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+/**
+ * Represents a view in a catalog.
+ */
+public interface CatalogView extends CommonTable {
+
+   /**
+* Original text of the view definition.
+*
+* @return the original string literal provided by the user.
+*/
+   String getOriginalQuery();
+
+   /**
+* Expanded text of the original view definition
+* This is needed because the context such as current DB is
+* lost after the session, in which view is defined, is gone.
+* Expanded query text takes care of this, as an example.
+*
+* For example, for a view that is defined in the context of 
"default" database with a query {@code select * from
+* test1}, the expanded query text might become {@code select 
`test1`.`name`, `test1`.`value` from `default`.`test1`},
+* where table test1 resides in database "default" and has two columns 
("name" and "value").
+*
+* @return the view definition in expanded text.
+*/
+   String getExpandedQuery();
 
 Review comment:
   Yes. Both of them are stored in the catalog at least for Hive. However, when 
the view is used for query, only expanded query is used. So, the problem you 
referred to can be avoided.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-12 Thread GitBox
xuefuz commented on a change in pull request #8007: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r274896725
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/DatabaseNotExistException.java
 ##
 @@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+/**
+ * Exception for trying to operate on a database that doesn't exist.
+ *
+ */
+public class DatabaseNotExistException extends RuntimeException {
 
 Review comment:
   I see. If you don't mind, I'd open a new JIRA on renaming the classes since 
this PR is almost ready and it didn't introduce the unfavorable names. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12171) The network buffer memory size should not be checked against the heap size on the TM side

2019-04-12 Thread Yun Gao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao reassigned FLINK-12171:
---

Assignee: Yun Gao

> The network buffer memory size should not be checked against the heap size on 
> the TM side
> -
>
> Key: FLINK-12171
> URL: https://issues.apache.org/jira/browse/FLINK-12171
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Runtime / Network
>Affects Versions: 1.7.2, 1.8.0
> Environment: Flink-1.7.2, and Flink-1.8 seems have not modified the 
> logic here.
>  
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> Currently when computing the network buffer memory size on the TM side in 
> _TaskManagerService#calculateNetworkBufferMemory_`(version 1.8 or 1.7) or 
> _NetworkEnvironmentConfiguration#calculateNewNetworkBufferMemory_(master), 
> the computed network buffer memory size is checked to be less than 
> `maxJvmHeapMemory`. However, in TM side, _maxJvmHeapMemory_ stores the 
> maximum heap memory (namely -Xmx) .
>  
> With the above process, when TM starts, -Xmx is computed in RM or in 
> _taskmanager.sh_ with (container memory - network buffer memory - managed 
> memory),  thus the above checking implies that the heap memory of the TM must 
> be larger than the network memory, which seems to be not necessary.
>  
>  
> Therefore, I think the network buffer memory size also need to be checked 
> against the total memory instead of the heap memory on the TM  side:
>  # Checks that networkBufFraction < 1.0.
>  # Compute the total memory by ( jvmHeapNoNet / (1 - networkBufFraction)).
>  # Compare the network buffer memory with the total memory.
> This checking is also consistent with the similar one done on the RM side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12171) The network buffer memory size should not be checked against the heap size on the TM side

2019-04-12 Thread Yun Gao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-12171:

Environment: 
Flink-1.7.2, and Flink-1.8 seems have not modified the logic here.

 

  was:
I tested with Flink-1.7.2 with computed network buffer size = 5G and 
taskmanager.heap.mb=6114, and the exception about checking is triggered. Yarn 
Session mode, Yarn single job mode and standalone mode are all tested.

 

I haven't tested on Flink-1.8 yet, but the logic seems to be not changed to me 
after reading the corresponding source code. 


> The network buffer memory size should not be checked against the heap size on 
> the TM side
> -
>
> Key: FLINK-12171
> URL: https://issues.apache.org/jira/browse/FLINK-12171
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Runtime / Network
>Affects Versions: 1.7.2, 1.8.0
> Environment: Flink-1.7.2, and Flink-1.8 seems have not modified the 
> logic here.
>  
>Reporter: Yun Gao
>Priority: Major
>
> Currently when computing the network buffer memory size on the TM side in 
> _TaskManagerService#calculateNetworkBufferMemory_`(version 1.8 or 1.7) or 
> _NetworkEnvironmentConfiguration#calculateNewNetworkBufferMemory_(master), 
> the computed network buffer memory size is checked to be less than 
> `maxJvmHeapMemory`. However, in TM side, _maxJvmHeapMemory_ stores the 
> maximum heap memory (namely -Xmx) .
>  
> With the above process, when TM starts, -Xmx is computed in RM or in 
> _taskmanager.sh_ with (container memory - network buffer memory - managed 
> memory),  thus the above checking implies that the heap memory of the TM must 
> be larger than the network memory, which seems to be not necessary.
>  
>  
> Therefore, I think the network buffer memory size also need to be checked 
> against the total memory instead of the heap memory on the TM  side:
>  # Checks that networkBufFraction < 1.0.
>  # Compute the total memory by ( jvmHeapNoNet / (1 - networkBufFraction)).
>  # Compare the network buffer memory with the total memory.
> This checking is also consistent with the similar one done on the RM side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12171) The network buffer memory size should not be checked against the heap size on the TM side

2019-04-12 Thread Yun Gao (JIRA)
Yun Gao created FLINK-12171:
---

 Summary: The network buffer memory size should not be checked 
against the heap size on the TM side
 Key: FLINK-12171
 URL: https://issues.apache.org/jira/browse/FLINK-12171
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration, Runtime / Network
Affects Versions: 1.8.0, 1.7.2
 Environment: I tested with Flink-1.7.2 with computed network buffer 
size = 5G and taskmanager.heap.mb=6114, and the exception about checking is 
triggered. Yarn Session mode, Yarn single job mode and standalone mode are all 
tested.

 

I haven't tested on Flink-1.8 yet, but the logic seems to be not changed to me 
after reading the corresponding source code. 
Reporter: Yun Gao


Currently when computing the network buffer memory size on the TM side in 
_TaskManagerService#calculateNetworkBufferMemory_`(version 1.8 or 1.7) or 
_NetworkEnvironmentConfiguration#calculateNewNetworkBufferMemory_(master), the 
computed network buffer memory size is checked to be less than 
`maxJvmHeapMemory`. However, in TM side, _maxJvmHeapMemory_ stores the maximum 
heap memory (namely -Xmx) .

 

With the above process, when TM starts, -Xmx is computed in RM or in 
_taskmanager.sh_ with (container memory - network buffer memory - managed 
memory),  thus the above checking implies that the heap memory of the TM must 
be larger than the network memory, which seems to be not necessary.

 

 

Therefore, I think the network buffer memory size also need to be checked 
against the total memory instead of the heap memory on the TM  side:
 # Checks that networkBufFraction < 1.0.
 # Compute the total memory by ( jvmHeapNoNet / (1 - networkBufFraction)).
 # Compare the network buffer memory with the total memory.

This checking is also consistent with the similar one done on the RM side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] godfreyhe commented on a change in pull request #8148: [FLINK-12161] [table-planner-blink] Supports partial-final optimization for stream group aggregate

2019-04-12 Thread GitBox
godfreyhe commented on a change in pull request #8148: [FLINK-12161] 
[table-planner-blink] Supports partial-final optimization for stream group 
aggregate
URL: https://github.com/apache/flink/pull/8148#discussion_r274869130
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelFactories.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.flink.table.plan.nodes.calcite.RankType.RankType
+import org.apache.flink.table.plan.nodes.calcite.{LogicalExpand, LogicalRank, 
LogicalSink, RankRange}
+import org.apache.flink.table.sinks.TableSink
+
+import org.apache.calcite.plan.Contexts
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.{RelCollation, RelNode}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilderFactory
+import org.apache.calcite.util.ImmutableBitSet
+
+import java.util
+
+/**
+  * Contains factory interface and default implementation for creating various 
rel nodes.
+  */
+object FlinkRelFactories {
 
 Review comment:
   This class defines the RelNode factory for extended node(e.g. Rank, Expand), 
 and `FlinkLogicalRelFactories` defines the all `FlinkLogicalRel` factories. 
   There is no mandatory requirement to use factory to create logical nodes. 
However it's more easy to create a logical node if using `RelBuilderFactory`. 
And a rule class can be applied on calcite logical RelNode or Flink logical 
RelNode if `RelBuilderFactory` instance as its constructor argument.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10929) Add support for Apache Arrow

2019-04-12 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816192#comment-16816192
 ] 

Stephan Ewen commented on FLINK-10929:
--

There are two different considerations:

(1) Arrow for interoperability with other systems and languages (source / sink 
/ inter-process-communication)
(2) Arrow as a format for internal data processing in SQL / Table API.

For (1), I see it would make sense, but then we need to look more concretely at 
what we want to integrate. Arrow is not a magic integration, it is a data 
format.

For (2), the original Flink query processor is not getting much committer 
attention, because we plan to replace it with the Blink processor in the long 
run. The Blink processor is in the process or merging and that needs to be 
finished before we can start making more changes. [~ykt836] could probably 
provide some input into when would be a good time to follow up there.


> Add support for Apache Arrow
> 
>
> Key: FLINK-10929
> URL: https://issues.apache.org/jira/browse/FLINK-10929
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / State Backends
>Reporter: Pedro Cardoso Silva
>Priority: Minor
> Attachments: image-2019-04-10-13-43-08-107.png
>
>
> Investigate the possibility of adding support for Apache Arrow as a 
> standardized columnar, memory format for data.
> Given the activity that [https://github.com/apache/arrow] is currently 
> getting and its claims objective of providing a zero-copy, standardized data 
> format across platforms, I think it makes sense for Flink to look into 
> supporting it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5970) Job-/TaskManagerStartupTest may run indefinitely

2019-04-12 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816185#comment-16816185
 ] 

Ajith S edited comment on FLINK-5970 at 4/12/19 11:34 AM:
--

Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

{code}
public class TestNonWritable {
 public static void main(String[] args) throws IOException {
  File file = new File(System.getProperty("java.io.tmpdir"));  
  File tempFolder = new File(file, args[0]);  
  System.out.println("Temp Dir " + tempFolder.getCanonicalPath());  
  System.out.println("Folder creation " + tempFolder.mkdir());  
  System.out.println("Permission change " + tempFolder.setWritable(false, 
false));  
  File tempFolderFile = new File(tempFolder, args[1]);  
  System.out.println("File Creation " + tempFolderFile.createNewFile());  
 }
}
{code}  

When run as root user:  
{noformat} java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true {noformat}

When run as non-root user:  
{noformat}java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12) {noformat}





was (Author: ajithshetty):
Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

{code}
public class TestNonWritable {
public static void main(String[] args) throws IOException {
File file = new File(System.getProperty("java.io.tmpdir"));  
File tempFolder = new File(file, args[0]);  
System.out.println("Temp Dir " + tempFolder.getCanonicalPath());  
System.out.println("Folder creation " + tempFolder.mkdir());  
System.out.println("Permission change " + tempFolder.setWritable(false, 
false));  
File tempFolderFile = new File(tempFolder, args[1]);  
System.out.println("File Creation " + tempFolderFile.createNewFile());  
 }  
 }
{code}  

When run as root user:  
{quote} java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true {quote}

When run as non-root user:  
{quote}java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12) {quote}




> Job-/TaskManagerStartupTest may run indefinitely
> 
>
> Key: FLINK-5970
> URL: https://issues.apache.org/jira/browse/FLINK-5970
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> The Job- and TaskManagerStartupTest both contain a test that verifies that 
> the JM/TM fails when giving a non-writable directory.
> In case of the TM this directory is used for temporary files, see 
> testIODirectoryNotWritable.
> In case of the JM this directory is given to the blobService, see 
> testJobManagerStartupFails.
> To that end it is necessary to create a non-writable directory. To verify 
> that this is at all possible we first rule out the Windows OS (for which 
> File#setWritable has no effect), and check the return value of 
> File#setWritable, which according to the documentation returns true if the 
> file was in fact marked as non-writable.
> When playing around with the BuddyWorks CI i noticed that these tests did 
> neither fail nor succeed; we are able to create a non-writable directory 
> (which i verified by checking the actual permissions), but the JM/TM still 
> start up fine. As a result the tests just run indefinitely since this case 
> wasn't considered.
> I'm still investigating why they don't fail; my current assumption is that in 
> this case files simply don't inherit the permissions of the parent directory.
> This means that the checks that the tests make aren't adequate. Instead of 
> verifying the permissions on the directory I propose verifying the actual 
> failure condition: That we can't create new files in this directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5970) Job-/TaskManagerStartupTest may run indefinitely

2019-04-12 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816185#comment-16816185
 ] 

Ajith S edited comment on FLINK-5970 at 4/12/19 11:32 AM:
--

Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

{code}
public class TestNonWritable {
public static void main(String[] args) throws IOException {
File file = new File(System.getProperty("java.io.tmpdir"));  
File tempFolder = new File(file, args[0]);  
System.out.println("Temp Dir " + tempFolder.getCanonicalPath());  
System.out.println("Folder creation " + tempFolder.mkdir());  
System.out.println("Permission change " + tempFolder.setWritable(false, 
false));  
File tempFolderFile = new File(tempFolder, args[1]);  
System.out.println("File Creation " + tempFolderFile.createNewFile());  
 }  
 }
{code}  

When run as root user:  
{quote} java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true {quote}

When run as non-root user:  
{quote}java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12) {quote}





was (Author: ajithshetty):
Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

{noformat}
public class TestNonWritable {
public static void main(String[] args) throws IOException {
File file = new File(System.getProperty("java.io.tmpdir"));  
File tempFolder = new File(file, args[0]);  
System.out.println("Temp Dir " + tempFolder.getCanonicalPath());  
System.out.println("Folder creation " + tempFolder.mkdir());  
System.out.println("Permission change " + tempFolder.setWritable(false, 
false));  
File tempFolderFile = new File(tempFolder, args[1]);  
System.out.println("File Creation " + tempFolderFile.createNewFile());  
 }  
 }
{noformat}  

When run as root user:  
{quote} java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true {quote}

When run as non-root user:  
{quote}java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12) {quote}




> Job-/TaskManagerStartupTest may run indefinitely
> 
>
> Key: FLINK-5970
> URL: https://issues.apache.org/jira/browse/FLINK-5970
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> The Job- and TaskManagerStartupTest both contain a test that verifies that 
> the JM/TM fails when giving a non-writable directory.
> In case of the TM this directory is used for temporary files, see 
> testIODirectoryNotWritable.
> In case of the JM this directory is given to the blobService, see 
> testJobManagerStartupFails.
> To that end it is necessary to create a non-writable directory. To verify 
> that this is at all possible we first rule out the Windows OS (for which 
> File#setWritable has no effect), and check the return value of 
> File#setWritable, which according to the documentation returns true if the 
> file was in fact marked as non-writable.
> When playing around with the BuddyWorks CI i noticed that these tests did 
> neither fail nor succeed; we are able to create a non-writable directory 
> (which i verified by checking the actual permissions), but the JM/TM still 
> start up fine. As a result the tests just run indefinitely since this case 
> wasn't considered.
> I'm still investigating why they don't fail; my current assumption is that in 
> this case files simply don't inherit the permissions of the parent directory.
> This means that the checks that the tests make aren't adequate. Instead of 
> verifying the permissions on the directory I propose verifying the actual 
> failure condition: That we can't create new files in this directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5970) Job-/TaskManagerStartupTest may run indefinitely

2019-04-12 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816185#comment-16816185
 ] 

Ajith S edited comment on FLINK-5970 at 4/12/19 11:31 AM:
--

Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

{quote}public class TestNonWritable {
public static void main(String[] args) throws IOException {
File file = new File(System.getProperty("java.io.tmpdir"));  
File tempFolder = new File(file, args[0]);  
System.out.println("Temp Dir " + tempFolder.getCanonicalPath());  
System.out.println("Folder creation " + tempFolder.mkdir());  
System.out.println("Permission change " + tempFolder.setWritable(false, 
false));  
File tempFolderFile = new File(tempFolder, args[1]);  
System.out.println("File Creation " + tempFolderFile.createNewFile());  
 }  
 }
{quote}  

When run as root user:  
{quote} java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true {quote}

When run as non-root user:  
{quote}java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12) {quote}





was (Author: ajithshetty):
Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

{quote} public class TestNonWritable {  
public static void main(String[] args) throws IOException {
File file = new File(System.getProperty("java.io.tmpdir"));  
File tempFolder = new File(file, args[0]);  
System.out.println("Temp Dir " + tempFolder.getCanonicalPath());  
System.out.println("Folder creation " + tempFolder.mkdir());  
System.out.println("Permission change " + tempFolder.setWritable(false, 
false));  
File tempFolderFile = new File(tempFolder, args[1]);  
System.out.println("File Creation " + tempFolderFile.createNewFile());  
 }  
 } {quote}

When run as root user:  
{quote} java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true {quote}

When run as non-root user:  
{quote}java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12) {quote}




> Job-/TaskManagerStartupTest may run indefinitely
> 
>
> Key: FLINK-5970
> URL: https://issues.apache.org/jira/browse/FLINK-5970
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> The Job- and TaskManagerStartupTest both contain a test that verifies that 
> the JM/TM fails when giving a non-writable directory.
> In case of the TM this directory is used for temporary files, see 
> testIODirectoryNotWritable.
> In case of the JM this directory is given to the blobService, see 
> testJobManagerStartupFails.
> To that end it is necessary to create a non-writable directory. To verify 
> that this is at all possible we first rule out the Windows OS (for which 
> File#setWritable has no effect), and check the return value of 
> File#setWritable, which according to the documentation returns true if the 
> file was in fact marked as non-writable.
> When playing around with the BuddyWorks CI i noticed that these tests did 
> neither fail nor succeed; we are able to create a non-writable directory 
> (which i verified by checking the actual permissions), but the JM/TM still 
> start up fine. As a result the tests just run indefinitely since this case 
> wasn't considered.
> I'm still investigating why they don't fail; my current assumption is that in 
> this case files simply don't inherit the permissions of the parent directory.
> This means that the checks that the tests make aren't adequate. Instead of 
> verifying the permissions on the directory I propose verifying the actual 
> failure condition: That we can't create new files in this directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5970) Job-/TaskManagerStartupTest may run indefinitely

2019-04-12 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816185#comment-16816185
 ] 

Ajith S edited comment on FLINK-5970 at 4/12/19 11:32 AM:
--

Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

{noformat}
public class TestNonWritable {
public static void main(String[] args) throws IOException {
File file = new File(System.getProperty("java.io.tmpdir"));  
File tempFolder = new File(file, args[0]);  
System.out.println("Temp Dir " + tempFolder.getCanonicalPath());  
System.out.println("Folder creation " + tempFolder.mkdir());  
System.out.println("Permission change " + tempFolder.setWritable(false, 
false));  
File tempFolderFile = new File(tempFolder, args[1]);  
System.out.println("File Creation " + tempFolderFile.createNewFile());  
 }  
 }
{noformat}  

When run as root user:  
{quote} java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true {quote}

When run as non-root user:  
{quote}java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12) {quote}





was (Author: ajithshetty):
Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

{quote}public class TestNonWritable {
public static void main(String[] args) throws IOException {
File file = new File(System.getProperty("java.io.tmpdir"));  
File tempFolder = new File(file, args[0]);  
System.out.println("Temp Dir " + tempFolder.getCanonicalPath());  
System.out.println("Folder creation " + tempFolder.mkdir());  
System.out.println("Permission change " + tempFolder.setWritable(false, 
false));  
File tempFolderFile = new File(tempFolder, args[1]);  
System.out.println("File Creation " + tempFolderFile.createNewFile());  
 }  
 }
{quote}  

When run as root user:  
{quote} java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true {quote}

When run as non-root user:  
{quote}java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12) {quote}




> Job-/TaskManagerStartupTest may run indefinitely
> 
>
> Key: FLINK-5970
> URL: https://issues.apache.org/jira/browse/FLINK-5970
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> The Job- and TaskManagerStartupTest both contain a test that verifies that 
> the JM/TM fails when giving a non-writable directory.
> In case of the TM this directory is used for temporary files, see 
> testIODirectoryNotWritable.
> In case of the JM this directory is given to the blobService, see 
> testJobManagerStartupFails.
> To that end it is necessary to create a non-writable directory. To verify 
> that this is at all possible we first rule out the Windows OS (for which 
> File#setWritable has no effect), and check the return value of 
> File#setWritable, which according to the documentation returns true if the 
> file was in fact marked as non-writable.
> When playing around with the BuddyWorks CI i noticed that these tests did 
> neither fail nor succeed; we are able to create a non-writable directory 
> (which i verified by checking the actual permissions), but the JM/TM still 
> start up fine. As a result the tests just run indefinitely since this case 
> wasn't considered.
> I'm still investigating why they don't fail; my current assumption is that in 
> this case files simply don't inherit the permissions of the parent directory.
> This means that the checks that the tests make aren't adequate. Instead of 
> verifying the permissions on the directory I propose verifying the actual 
> failure condition: That we can't create new files in this directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5970) Job-/TaskManagerStartupTest may run indefinitely

2019-04-12 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816185#comment-16816185
 ] 

Ajith S edited comment on FLINK-5970 at 4/12/19 11:30 AM:
--

Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

{quote} public class TestNonWritable {  
public static void main(String[] args) throws IOException {
File file = new File(System.getProperty("java.io.tmpdir"));  
File tempFolder = new File(file, args[0]);  
System.out.println("Temp Dir " + tempFolder.getCanonicalPath());  
System.out.println("Folder creation " + tempFolder.mkdir());  
System.out.println("Permission change " + tempFolder.setWritable(false, 
false));  
File tempFolderFile = new File(tempFolder, args[1]);  
System.out.println("File Creation " + tempFolderFile.createNewFile());  
 }  
 } {quote}

When run as root user:  
{quote} java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true {quote}

When run as non-root user:  
{quote}java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12) {quote}





was (Author: ajithshetty):
Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

bq. public class TestNonWritable {
bq.  public static void main(String[] args) throws IOException {
bq.  File file = new File(System.getProperty("java.io.tmpdir"));
bq.  File tempFolder = new File(file, args[0]);
bq.  System.out.println("Temp Dir " + tempFolder.getCanonicalPath());
bq.  System.out.println("Folder creation " + tempFolder.mkdir());
bq.  System.out.println("Permission change " + tempFolder.setWritable(false, 
false));
bq.  File tempFolderFile = new File(tempFolder, args[1]);
bq.  System.out.println("File Creation " + tempFolderFile.createNewFile());
bq.  }
bq. }

When run as root user:  
{quote} java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true {quote}

When run as non-root user:  
{quote}java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12) {quote}




> Job-/TaskManagerStartupTest may run indefinitely
> 
>
> Key: FLINK-5970
> URL: https://issues.apache.org/jira/browse/FLINK-5970
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> The Job- and TaskManagerStartupTest both contain a test that verifies that 
> the JM/TM fails when giving a non-writable directory.
> In case of the TM this directory is used for temporary files, see 
> testIODirectoryNotWritable.
> In case of the JM this directory is given to the blobService, see 
> testJobManagerStartupFails.
> To that end it is necessary to create a non-writable directory. To verify 
> that this is at all possible we first rule out the Windows OS (for which 
> File#setWritable has no effect), and check the return value of 
> File#setWritable, which according to the documentation returns true if the 
> file was in fact marked as non-writable.
> When playing around with the BuddyWorks CI i noticed that these tests did 
> neither fail nor succeed; we are able to create a non-writable directory 
> (which i verified by checking the actual permissions), but the JM/TM still 
> start up fine. As a result the tests just run indefinitely since this case 
> wasn't considered.
> I'm still investigating why they don't fail; my current assumption is that in 
> this case files simply don't inherit the permissions of the parent directory.
> This means that the checks that the tests make aren't adequate. Instead of 
> verifying the permissions on the directory I propose verifying the actual 
> failure condition: That we can't create new files in this directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5970) Job-/TaskManagerStartupTest may run indefinitely

2019-04-12 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816185#comment-16816185
 ] 

Ajith S edited comment on FLINK-5970 at 4/12/19 11:29 AM:
--

Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

bq. public class TestNonWritable {
bq.  public static void main(String[] args) throws IOException {
bq.  File file = new File(System.getProperty("java.io.tmpdir"));
bq.  File tempFolder = new File(file, args[0]);
bq.  System.out.println("Temp Dir " + tempFolder.getCanonicalPath());
bq.  System.out.println("Folder creation " + tempFolder.mkdir());
bq.  System.out.println("Permission change " + tempFolder.setWritable(false, 
false));
bq.  File tempFolderFile = new File(tempFolder, args[1]);
bq.  System.out.println("File Creation " + tempFolderFile.createNewFile());
bq.  }
bq. }

When run as root user:  
{quote} java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true {quote}

When run as non-root user:  
{quote}java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12) {quote}





was (Author: ajithshetty):
Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

{quote}
public class TestNonWritable {
 public static void main(String[] args) throws IOException {
 File file = new File(System.getProperty("java.io.tmpdir"));
 File tempFolder = new File(file, args[0]);
 System.out.println("Temp Dir " + tempFolder.getCanonicalPath());
 System.out.println("Folder creation " + tempFolder.mkdir());
 System.out.println("Permission change " + tempFolder.setWritable(false, 
false));
 File tempFolderFile = new File(tempFolder, args[1]);
 System.out.println("File Creation " + tempFolderFile.createNewFile());
 }
}
{quote}

When run as root user:  
{quote} java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true {quote}

When run as non-root user:  
{quote}java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12) {quote}




> Job-/TaskManagerStartupTest may run indefinitely
> 
>
> Key: FLINK-5970
> URL: https://issues.apache.org/jira/browse/FLINK-5970
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> The Job- and TaskManagerStartupTest both contain a test that verifies that 
> the JM/TM fails when giving a non-writable directory.
> In case of the TM this directory is used for temporary files, see 
> testIODirectoryNotWritable.
> In case of the JM this directory is given to the blobService, see 
> testJobManagerStartupFails.
> To that end it is necessary to create a non-writable directory. To verify 
> that this is at all possible we first rule out the Windows OS (for which 
> File#setWritable has no effect), and check the return value of 
> File#setWritable, which according to the documentation returns true if the 
> file was in fact marked as non-writable.
> When playing around with the BuddyWorks CI i noticed that these tests did 
> neither fail nor succeed; we are able to create a non-writable directory 
> (which i verified by checking the actual permissions), but the JM/TM still 
> start up fine. As a result the tests just run indefinitely since this case 
> wasn't considered.
> I'm still investigating why they don't fail; my current assumption is that in 
> this case files simply don't inherit the permissions of the parent directory.
> This means that the checks that the tests make aren't adequate. Instead of 
> verifying the permissions on the directory I propose verifying the actual 
> failure condition: That we can't create new files in this directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5970) Job-/TaskManagerStartupTest may run indefinitely

2019-04-12 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816185#comment-16816185
 ] 

Ajith S edited comment on FLINK-5970 at 4/12/19 11:28 AM:
--

Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

{quote}
public class TestNonWritable {
 public static void main(String[] args) throws IOException {
 File file = new File(System.getProperty("java.io.tmpdir"));
 File tempFolder = new File(file, args[0]);
 System.out.println("Temp Dir " + tempFolder.getCanonicalPath());
 System.out.println("Folder creation " + tempFolder.mkdir());
 System.out.println("Permission change " + tempFolder.setWritable(false, 
false));
 File tempFolderFile = new File(tempFolder, args[1]);
 System.out.println("File Creation " + tempFolderFile.createNewFile());
 }
}
{quote}

When run as root user:  
{quote} java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true {quote}

When run as non-root user:  
{quote}java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12) {quote}





was (Author: ajithshetty):
Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

```
public class TestNonWritable {
 public static void main(String[] args) throws IOException {
 File file = new File(System.getProperty("java.io.tmpdir"));
 File tempFolder = new File(file, args[0]);
 System.out.println("Temp Dir " + tempFolder.getCanonicalPath());
 System.out.println("Folder creation " + tempFolder.mkdir());
 System.out.println("Permission change " + tempFolder.setWritable(false, 
false));
 File tempFolderFile = new File(tempFolder, args[1]);
 System.out.println("File Creation " + tempFolderFile.createNewFile());
 }
}
```

When run as root user:
```
 java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true
```

When run as non-root user:

{quote}java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12) {quote}




> Job-/TaskManagerStartupTest may run indefinitely
> 
>
> Key: FLINK-5970
> URL: https://issues.apache.org/jira/browse/FLINK-5970
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> The Job- and TaskManagerStartupTest both contain a test that verifies that 
> the JM/TM fails when giving a non-writable directory.
> In case of the TM this directory is used for temporary files, see 
> testIODirectoryNotWritable.
> In case of the JM this directory is given to the blobService, see 
> testJobManagerStartupFails.
> To that end it is necessary to create a non-writable directory. To verify 
> that this is at all possible we first rule out the Windows OS (for which 
> File#setWritable has no effect), and check the return value of 
> File#setWritable, which according to the documentation returns true if the 
> file was in fact marked as non-writable.
> When playing around with the BuddyWorks CI i noticed that these tests did 
> neither fail nor succeed; we are able to create a non-writable directory 
> (which i verified by checking the actual permissions), but the JM/TM still 
> start up fine. As a result the tests just run indefinitely since this case 
> wasn't considered.
> I'm still investigating why they don't fail; my current assumption is that in 
> this case files simply don't inherit the permissions of the parent directory.
> This means that the checks that the tests make aren't adequate. Instead of 
> verifying the permissions on the directory I propose verifying the actual 
> failure condition: That we can't create new files in this directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5970) Job-/TaskManagerStartupTest may run indefinitely

2019-04-12 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816185#comment-16816185
 ] 

Ajith S edited comment on FLINK-5970 at 4/12/19 11:27 AM:
--

Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

```
public class TestNonWritable {
 public static void main(String[] args) throws IOException {
 File file = new File(System.getProperty("java.io.tmpdir"));
 File tempFolder = new File(file, args[0]);
 System.out.println("Temp Dir " + tempFolder.getCanonicalPath());
 System.out.println("Folder creation " + tempFolder.mkdir());
 System.out.println("Permission change " + tempFolder.setWritable(false, 
false));
 File tempFolderFile = new File(tempFolder, args[1]);
 System.out.println("File Creation " + tempFolderFile.createNewFile());
 }
}
```

When run as root user:
```
 java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true
```

When run as non-root user:

{quote}java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12) {quote}





was (Author: ajithshetty):
Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

```
public class TestNonWritable {
 public static void main(String[] args) throws IOException {
 File file = new File(System.getProperty("java.io.tmpdir"));
 File tempFolder = new File(file, args[0]);
 System.out.println("Temp Dir " + tempFolder.getCanonicalPath());
 System.out.println("Folder creation " + tempFolder.mkdir());
 System.out.println("Permission change " + tempFolder.setWritable(false, 
false));
 File tempFolderFile = new File(tempFolder, args[1]);
 System.out.println("File Creation " + tempFolderFile.createNewFile());
 }
}
```

When run as root user:
```
 java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true
```

When run as non-root user:

{{java -cp . TestNonWritable abc1 xyz1}}
{quote}Temp Dir /tmp/abc1{quote}
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12)




> Job-/TaskManagerStartupTest may run indefinitely
> 
>
> Key: FLINK-5970
> URL: https://issues.apache.org/jira/browse/FLINK-5970
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> The Job- and TaskManagerStartupTest both contain a test that verifies that 
> the JM/TM fails when giving a non-writable directory.
> In case of the TM this directory is used for temporary files, see 
> testIODirectoryNotWritable.
> In case of the JM this directory is given to the blobService, see 
> testJobManagerStartupFails.
> To that end it is necessary to create a non-writable directory. To verify 
> that this is at all possible we first rule out the Windows OS (for which 
> File#setWritable has no effect), and check the return value of 
> File#setWritable, which according to the documentation returns true if the 
> file was in fact marked as non-writable.
> When playing around with the BuddyWorks CI i noticed that these tests did 
> neither fail nor succeed; we are able to create a non-writable directory 
> (which i verified by checking the actual permissions), but the JM/TM still 
> start up fine. As a result the tests just run indefinitely since this case 
> wasn't considered.
> I'm still investigating why they don't fail; my current assumption is that in 
> this case files simply don't inherit the permissions of the parent directory.
> This means that the checks that the tests make aren't adequate. Instead of 
> verifying the permissions on the directory I propose verifying the actual 
> failure condition: That we can't create new files in this directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5970) Job-/TaskManagerStartupTest may run indefinitely

2019-04-12 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816185#comment-16816185
 ] 

Ajith S edited comment on FLINK-5970 at 4/12/19 11:27 AM:
--

Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

```
public class TestNonWritable {
 public static void main(String[] args) throws IOException {
 File file = new File(System.getProperty("java.io.tmpdir"));
 File tempFolder = new File(file, args[0]);
 System.out.println("Temp Dir " + tempFolder.getCanonicalPath());
 System.out.println("Folder creation " + tempFolder.mkdir());
 System.out.println("Permission change " + tempFolder.setWritable(false, 
false));
 File tempFolderFile = new File(tempFolder, args[1]);
 System.out.println("File Creation " + tempFolderFile.createNewFile());
 }
}
```

When run as root user:
```
 java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true
```

When run as non-root user:

{{java -cp . TestNonWritable abc1 xyz1}}
{quote}Temp Dir /tmp/abc1{quote}
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12)





was (Author: ajithshetty):
Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

```
public class TestNonWritable {
 public static void main(String[] args) throws IOException {
 File file = new File(System.getProperty("java.io.tmpdir"));
 File tempFolder = new File(file, args[0]);
 System.out.println("Temp Dir " + tempFolder.getCanonicalPath());
 System.out.println("Folder creation " + tempFolder.mkdir());
 System.out.println("Permission change " + tempFolder.setWritable(false, 
false));
 File tempFolderFile = new File(tempFolder, args[1]);
 System.out.println("File Creation " + tempFolderFile.createNewFile());
 }
}
```

When run as root user:
```
 java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true
```

When run as non-root user:

{{java -cp . TestNonWritable abc1 xyz1}}
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12)




> Job-/TaskManagerStartupTest may run indefinitely
> 
>
> Key: FLINK-5970
> URL: https://issues.apache.org/jira/browse/FLINK-5970
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> The Job- and TaskManagerStartupTest both contain a test that verifies that 
> the JM/TM fails when giving a non-writable directory.
> In case of the TM this directory is used for temporary files, see 
> testIODirectoryNotWritable.
> In case of the JM this directory is given to the blobService, see 
> testJobManagerStartupFails.
> To that end it is necessary to create a non-writable directory. To verify 
> that this is at all possible we first rule out the Windows OS (for which 
> File#setWritable has no effect), and check the return value of 
> File#setWritable, which according to the documentation returns true if the 
> file was in fact marked as non-writable.
> When playing around with the BuddyWorks CI i noticed that these tests did 
> neither fail nor succeed; we are able to create a non-writable directory 
> (which i verified by checking the actual permissions), but the JM/TM still 
> start up fine. As a result the tests just run indefinitely since this case 
> wasn't considered.
> I'm still investigating why they don't fail; my current assumption is that in 
> this case files simply don't inherit the permissions of the parent directory.
> This means that the checks that the tests make aren't adequate. Instead of 
> verifying the permissions on the directory I propose verifying the actual 
> failure condition: That we can't create new files in this directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5970) Job-/TaskManagerStartupTest may run indefinitely

2019-04-12 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816185#comment-16816185
 ] 

Ajith S edited comment on FLINK-5970 at 4/12/19 11:26 AM:
--

Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

```
public class TestNonWritable {
 public static void main(String[] args) throws IOException {
 File file = new File(System.getProperty("java.io.tmpdir"));
 File tempFolder = new File(file, args[0]);
 System.out.println("Temp Dir " + tempFolder.getCanonicalPath());
 System.out.println("Folder creation " + tempFolder.mkdir());
 System.out.println("Permission change " + tempFolder.setWritable(false, 
false));
 File tempFolderFile = new File(tempFolder, args[1]);
 System.out.println("File Creation " + tempFolderFile.createNewFile());
 }
}
```

When run as root user:
```
 java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true
```

When run as non-root user:

{{java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12)}}





was (Author: ajithshetty):
Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

```
public class TestNonWritable {
 public static void main(String[] args) throws IOException {
 File file = new File(System.getProperty("java.io.tmpdir"));
 File tempFolder = new File(file, args[0]);
 System.out.println("Temp Dir " + tempFolder.getCanonicalPath());
 System.out.println("Folder creation " + tempFolder.mkdir());
 System.out.println("Permission change " + tempFolder.setWritable(false, 
false));
 File tempFolderFile = new File(tempFolder, args[1]);
 System.out.println("File Creation " + tempFolderFile.createNewFile());
 }
}
```

When run as root user:
```
 java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true
```

When run as non-root user:
```
java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12)
```



> Job-/TaskManagerStartupTest may run indefinitely
> 
>
> Key: FLINK-5970
> URL: https://issues.apache.org/jira/browse/FLINK-5970
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> The Job- and TaskManagerStartupTest both contain a test that verifies that 
> the JM/TM fails when giving a non-writable directory.
> In case of the TM this directory is used for temporary files, see 
> testIODirectoryNotWritable.
> In case of the JM this directory is given to the blobService, see 
> testJobManagerStartupFails.
> To that end it is necessary to create a non-writable directory. To verify 
> that this is at all possible we first rule out the Windows OS (for which 
> File#setWritable has no effect), and check the return value of 
> File#setWritable, which according to the documentation returns true if the 
> file was in fact marked as non-writable.
> When playing around with the BuddyWorks CI i noticed that these tests did 
> neither fail nor succeed; we are able to create a non-writable directory 
> (which i verified by checking the actual permissions), but the JM/TM still 
> start up fine. As a result the tests just run indefinitely since this case 
> wasn't considered.
> I'm still investigating why they don't fail; my current assumption is that in 
> this case files simply don't inherit the permissions of the parent directory.
> This means that the checks that the tests make aren't adequate. Instead of 
> verifying the permissions on the directory I propose verifying the actual 
> failure condition: That we can't create new files in this directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5970) Job-/TaskManagerStartupTest may run indefinitely

2019-04-12 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816185#comment-16816185
 ] 

Ajith S edited comment on FLINK-5970 at 4/12/19 11:26 AM:
--

Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

```
public class TestNonWritable {
 public static void main(String[] args) throws IOException {
 File file = new File(System.getProperty("java.io.tmpdir"));
 File tempFolder = new File(file, args[0]);
 System.out.println("Temp Dir " + tempFolder.getCanonicalPath());
 System.out.println("Folder creation " + tempFolder.mkdir());
 System.out.println("Permission change " + tempFolder.setWritable(false, 
false));
 File tempFolderFile = new File(tempFolder, args[1]);
 System.out.println("File Creation " + tempFolderFile.createNewFile());
 }
}
```

When run as root user:
```
 java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true
```

When run as non-root user:

{{java -cp . TestNonWritable abc1 xyz1}}
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12)





was (Author: ajithshetty):
Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

```
public class TestNonWritable {
 public static void main(String[] args) throws IOException {
 File file = new File(System.getProperty("java.io.tmpdir"));
 File tempFolder = new File(file, args[0]);
 System.out.println("Temp Dir " + tempFolder.getCanonicalPath());
 System.out.println("Folder creation " + tempFolder.mkdir());
 System.out.println("Permission change " + tempFolder.setWritable(false, 
false));
 File tempFolderFile = new File(tempFolder, args[1]);
 System.out.println("File Creation " + tempFolderFile.createNewFile());
 }
}
```

When run as root user:
```
 java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true
```

When run as non-root user:

{{java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12)}}




> Job-/TaskManagerStartupTest may run indefinitely
> 
>
> Key: FLINK-5970
> URL: https://issues.apache.org/jira/browse/FLINK-5970
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> The Job- and TaskManagerStartupTest both contain a test that verifies that 
> the JM/TM fails when giving a non-writable directory.
> In case of the TM this directory is used for temporary files, see 
> testIODirectoryNotWritable.
> In case of the JM this directory is given to the blobService, see 
> testJobManagerStartupFails.
> To that end it is necessary to create a non-writable directory. To verify 
> that this is at all possible we first rule out the Windows OS (for which 
> File#setWritable has no effect), and check the return value of 
> File#setWritable, which according to the documentation returns true if the 
> file was in fact marked as non-writable.
> When playing around with the BuddyWorks CI i noticed that these tests did 
> neither fail nor succeed; we are able to create a non-writable directory 
> (which i verified by checking the actual permissions), but the JM/TM still 
> start up fine. As a result the tests just run indefinitely since this case 
> wasn't considered.
> I'm still investigating why they don't fail; my current assumption is that in 
> this case files simply don't inherit the permissions of the parent directory.
> This means that the checks that the tests make aren't adequate. Instead of 
> verifying the permissions on the directory I propose verifying the actual 
> failure condition: That we can't create new files in this directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5970) Job-/TaskManagerStartupTest may run indefinitely

2019-04-12 Thread Ajith S (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816185#comment-16816185
 ] 

Ajith S commented on FLINK-5970:


Taking a guess, u may be executing test as root user, hence the indefinite run.?

Here is a test code:

```
public class TestNonWritable {
 public static void main(String[] args) throws IOException {
 File file = new File(System.getProperty("java.io.tmpdir"));
 File tempFolder = new File(file, args[0]);
 System.out.println("Temp Dir " + tempFolder.getCanonicalPath());
 System.out.println("Folder creation " + tempFolder.mkdir());
 System.out.println("Permission change " + tempFolder.setWritable(false, 
false));
 File tempFolderFile = new File(tempFolder, args[1]);
 System.out.println("File Creation " + tempFolderFile.createNewFile());
 }
}
```

When run as root user:
```
 java -cp . TestNonWritable abc2 xyz2
Temp Dir /tmp/abc2
Folder creation true
Permission change true
File Creation true
```

When run as non-root user:
```
java -cp . TestNonWritable abc1 xyz1
Temp Dir /tmp/abc1
Folder creation true
Permission change true
Exception in thread "main" java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at TestNonWritable.main(TestNonWritable.java:12)
```



> Job-/TaskManagerStartupTest may run indefinitely
> 
>
> Key: FLINK-5970
> URL: https://issues.apache.org/jira/browse/FLINK-5970
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> The Job- and TaskManagerStartupTest both contain a test that verifies that 
> the JM/TM fails when giving a non-writable directory.
> In case of the TM this directory is used for temporary files, see 
> testIODirectoryNotWritable.
> In case of the JM this directory is given to the blobService, see 
> testJobManagerStartupFails.
> To that end it is necessary to create a non-writable directory. To verify 
> that this is at all possible we first rule out the Windows OS (for which 
> File#setWritable has no effect), and check the return value of 
> File#setWritable, which according to the documentation returns true if the 
> file was in fact marked as non-writable.
> When playing around with the BuddyWorks CI i noticed that these tests did 
> neither fail nor succeed; we are able to create a non-writable directory 
> (which i verified by checking the actual permissions), but the JM/TM still 
> start up fine. As a result the tests just run indefinitely since this case 
> wasn't considered.
> I'm still investigating why they don't fail; my current assumption is that in 
> this case files simply don't inherit the permissions of the parent directory.
> This means that the checks that the tests make aren't adequate. Instead of 
> verifying the permissions on the directory I propose verifying the actual 
> failure condition: That we can't create new files in this directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] GJL commented on issue #7986: [FLINK-10517][rest] Add stability test

2019-04-12 Thread GitBox
GJL commented on issue #7986: [FLINK-10517][rest] Add stability test
URL: https://github.com/apache/flink/pull/7986#issuecomment-482539269
 
 
   I think overall this looks good. I have left some minor comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test

2019-04-12 Thread GitBox
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add 
stability test
URL: https://github.com/apache/flink/pull/7986#discussion_r274865419
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.compatibility;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
+import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Stability test and snapshot generator for the REST API.
+ */
+@RunWith(Parameterized.class)
+public final class RestAPIStabilityTest extends TestLogger {
+
+   private static final String REGENERATE_SNAPSHOT_PROPERTY = 
"generate-rest-snapshot";
+
+   private static final String SNAPSHOT_RESOURCE_PATTERN = 
"rest_api_%s.snapshot";
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   @Parameterized.Parameters(name = "version = {0}")
+   public static Iterable getStableVersions() {
+   return Arrays.stream(RestAPIVersion.values())
+   .filter(RestAPIVersion::isStableVersion)
+   .collect(Collectors.toList());
+   }
+
+   private final RestAPIVersion apiVersion;
+
+   public RestAPIStabilityTest(final RestAPIVersion apiVersion) {
+   this.apiVersion = apiVersion;
+   }
+
+   @Test
+   public void testDispatcherRestAPIStability() throws IOException {
+   final String versionedSnapshotFileName = 
String.format(SNAPSHOT_RESOURCE_PATTERN, apiVersion.getURLVersionPrefix());
+
+   final RestAPISnapshot currentSnapshot = createSnapshot(new 
DocumentingDispatcherRestEndpoint());
+
+   if (System.getProperty(REGENERATE_SNAPSHOT_PROPERTY) != null) {
+   writeSnapshot(versionedSnapshotFileName, 
currentSnapshot);
+   }
+
+   final URL resource = 
RestAPIStabilityTest.class.getClassLoader().getResource(versionedSnapshotFileName);
+   if (resource == null) {
+   Assert.fail("Snapshot file does not exist. If you added 
a new version, re-run this test with" +
+   " -D" + REGENERATE_SNAPSHOT_PROPERTY + " being 
set.");
+   }
+   final RestAPISnapshot previousSnapshot = 
OBJECT_MAPPER.readValue(new File(resource.getFile()), RestAPISnapshot.class);
+
+   assertCompatible(previousSnapshot, currentSnapshot);
+   }
+
+   private static void writeSnapshot(final String 
versionedSnapshotFileName, final RestAPISnapshot snapshot) throws IOException {
+   OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
+   .writeValue(
+   new File("src/test/resources/" + 
versionedSnapshotFileName),
+   snapshot);
+   System.out.println("REST API snapshot " + 
versionedSnapshotFileName + " was updated, please remember to commit the 
snapshot.");
+   }
+
+   private RestAPISnapshot createSnapshot(final DocumentingRestEndpoint 
dispatcherRestEndpoint) {
+   final List calls = 
dispatcherRestEndpoint.getSpecs().stream()

[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test

2019-04-12 Thread GitBox
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add 
stability test
URL: https://github.com/apache/flink/pull/7986#discussion_r274865419
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.compatibility;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
+import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Stability test and snapshot generator for the REST API.
+ */
+@RunWith(Parameterized.class)
+public final class RestAPIStabilityTest extends TestLogger {
+
+   private static final String REGENERATE_SNAPSHOT_PROPERTY = 
"generate-rest-snapshot";
+
+   private static final String SNAPSHOT_RESOURCE_PATTERN = 
"rest_api_%s.snapshot";
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   @Parameterized.Parameters(name = "version = {0}")
+   public static Iterable getStableVersions() {
+   return Arrays.stream(RestAPIVersion.values())
+   .filter(RestAPIVersion::isStableVersion)
+   .collect(Collectors.toList());
+   }
+
+   private final RestAPIVersion apiVersion;
+
+   public RestAPIStabilityTest(final RestAPIVersion apiVersion) {
+   this.apiVersion = apiVersion;
+   }
+
+   @Test
+   public void testDispatcherRestAPIStability() throws IOException {
+   final String versionedSnapshotFileName = 
String.format(SNAPSHOT_RESOURCE_PATTERN, apiVersion.getURLVersionPrefix());
+
+   final RestAPISnapshot currentSnapshot = createSnapshot(new 
DocumentingDispatcherRestEndpoint());
+
+   if (System.getProperty(REGENERATE_SNAPSHOT_PROPERTY) != null) {
+   writeSnapshot(versionedSnapshotFileName, 
currentSnapshot);
+   }
+
+   final URL resource = 
RestAPIStabilityTest.class.getClassLoader().getResource(versionedSnapshotFileName);
+   if (resource == null) {
+   Assert.fail("Snapshot file does not exist. If you added 
a new version, re-run this test with" +
+   " -D" + REGENERATE_SNAPSHOT_PROPERTY + " being 
set.");
+   }
+   final RestAPISnapshot previousSnapshot = 
OBJECT_MAPPER.readValue(new File(resource.getFile()), RestAPISnapshot.class);
+
+   assertCompatible(previousSnapshot, currentSnapshot);
+   }
+
+   private static void writeSnapshot(final String 
versionedSnapshotFileName, final RestAPISnapshot snapshot) throws IOException {
+   OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
+   .writeValue(
+   new File("src/test/resources/" + 
versionedSnapshotFileName),
+   snapshot);
+   System.out.println("REST API snapshot " + 
versionedSnapshotFileName + " was updated, please remember to commit the 
snapshot.");
+   }
+
+   private RestAPISnapshot createSnapshot(final DocumentingRestEndpoint 
dispatcherRestEndpoint) {
+   final List calls = 
dispatcherRestEndpoint.getSpecs().stream()

[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test

2019-04-12 Thread GitBox
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add 
stability test
URL: https://github.com/apache/flink/pull/7986#discussion_r274493121
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.compatibility;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
+import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Stability test and snapshot generator for the REST API.
+ */
+@RunWith(Parameterized.class)
+public final class RestAPIStabilityTest extends TestLogger {
+
+   private static final String REGENERATE_SNAPSHOT_PROPERTY = 
"generate-rest-snapshot";
+
+   private static final String SNAPSHOT_RESOURCE_PATTERN = 
"rest_api_%s.snapshot";
+
+   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+   @Parameterized.Parameters(name = "version = {0}")
+   public static Iterable getStableVersions() {
+   return Arrays.stream(RestAPIVersion.values())
+   .filter(RestAPIVersion::isStableVersion)
+   .collect(Collectors.toList());
+   }
+
+   private final RestAPIVersion apiVersion;
+
+   public RestAPIStabilityTest(final RestAPIVersion apiVersion) {
+   this.apiVersion = apiVersion;
+   }
+
+   @Test
+   public void testDispatcherRestAPIStability() throws IOException {
+   final String versionedSnapshotFileName = 
String.format(SNAPSHOT_RESOURCE_PATTERN, apiVersion.getURLVersionPrefix());
+
+   final RestAPISnapshot currentSnapshot = createSnapshot(new 
DocumentingDispatcherRestEndpoint());
+
+   if (System.getProperty(REGENERATE_SNAPSHOT_PROPERTY) != null) {
+   writeSnapshot(versionedSnapshotFileName, 
currentSnapshot);
+   }
+
+   final URL resource = 
RestAPIStabilityTest.class.getClassLoader().getResource(versionedSnapshotFileName);
+   if (resource == null) {
+   Assert.fail("Snapshot file does not exist. If you added 
a new version, re-run this test with" +
+   " -D" + REGENERATE_SNAPSHOT_PROPERTY + " being 
set.");
+   }
+   final RestAPISnapshot previousSnapshot = 
OBJECT_MAPPER.readValue(resource, RestAPISnapshot.class);
+
+   assertCompatible(previousSnapshot, currentSnapshot);
+   }
+
+   private static void writeSnapshot(final String 
versionedSnapshotFileName, final RestAPISnapshot snapshot) throws IOException {
+   OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
+   .writeValue(
+   new File("src/test/resources/" + 
versionedSnapshotFileName),
+   snapshot);
+   System.out.println("REST API snapshot " + 
versionedSnapshotFileName + " was updated, please remember to commit the 
snapshot.");
+   }
+
+   private RestAPISnapshot createSnapshot(final DocumentingRestEndpoint 
restEndpoint) {
+   final List calls = restEndpoint.getSpecs().stream()
+   // we only 

[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test

2019-04-12 Thread GitBox
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add 
stability test
URL: https://github.com/apache/flink/pull/7986#discussion_r274864247
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutine.java
 ##
 @@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.compatibility;
+
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.junit.Assert;
+
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/**
+ * Routine for checking the compatibility of a {@link MessageHeaders} pair.
+ *
+ * The 'extractor' {@link Function} generates a 'container', a 
jackson-compatible object containing the data that
+ * the routine bases it's compatibility-evaluation on.
+ * The 'assertion' {@link BiConsumer} accepts a pair of containers and asserts 
the compatibility. Incompatibilities are
+ * signaled by throwing an {@link AssertionError}. This implies that the 
method body will typically contain jUnit
+ * assertions.
+ */
+final class CompatibilityRoutine {
+
+   private final String key;
+   private final Class containerClass;
+   private final Function, C> extractor;
+   private final BiConsumer assertion;
+
+   CompatibilityRoutine(
+   final String key,
+   final Class containerClass,
+   final Function, C> extractor,
+   final BiConsumer assertion) {
+   this.key = key;
+   this.containerClass = containerClass;
+   this.extractor = extractor;
+   this.assertion = assertion;
+   }
+
+   String getKey() {
+   return key;
+   }
+
+   Class getContainerClass() {
+   return containerClass;
+   }
+
+   C getContainer(final MessageHeaders header) {
+   final C container = extractor.apply(header);
+   Assert.assertNotNull("Implementation error: Extractor returned 
null.", container);
+   return container;
+   }
+
+   CompatibilityCheckResult checkCompatibility(final Optional old, 
final Optional cur) {
+   if (!old.isPresent() && !cur.isPresent()) {
+   Assert.fail(String.format(
 
 Review comment:
   Isn't this more like a `checkState` or `checkArgument`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8148: [FLINK-12161] [table-planner-blink] Supports partial-final optimization for stream group aggregate

2019-04-12 Thread GitBox
godfreyhe commented on a change in pull request #8148: [FLINK-12161] 
[table-planner-blink] Supports partial-final optimization for stream group 
aggregate
URL: https://github.com/apache/flink/pull/8148#discussion_r274864535
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkLogicalRelFactories.scala
 ##
 @@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.flink.table.calcite.FlinkRelFactories.{ExpandFactory, 
RankFactory, SinkFactory}
+import org.apache.flink.table.plan.nodes.calcite.RankRange
+import org.apache.flink.table.plan.nodes.calcite.RankType.RankType
+import org.apache.flink.table.plan.nodes.logical._
+import org.apache.flink.table.plan.schema.FlinkRelOptTable
+import org.apache.flink.table.sinks.TableSink
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.{Contexts, RelOptCluster, RelOptTable, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.core.RelFactories._
+import org.apache.calcite.rel.core._
+import org.apache.calcite.rel.logical._
+import org.apache.calcite.rel.{RelCollation, RelNode}
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind.{EXCEPT, INTERSECT, UNION}
+import org.apache.calcite.sql.{SemiJoinType, SqlKind}
+import org.apache.calcite.tools.{RelBuilder, RelBuilderFactory}
+import org.apache.calcite.util.ImmutableBitSet
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Contains factory interface and default implementation for creating various
+  * flink logical rel nodes.
+  */
+object FlinkLogicalRelFactories {
 
 Review comment:
   This class is used to create `FlinkLogicalRel` in rules. Currently, it only 
is used in `SplitAggregateRule` which is applied on `FlinkLogicalRel`. 
If we do not provide this class, we need create each `FlinkLogicalRel` 
directly, instead of using methods in `RelBuilder`, like `project()`, 
`aggregate()`.
   
   However, What I worry about is `FlinkLogicalRelFactories` can not create  a 
`FlinkLogicalRel` with new attribute field,  e.g. `FlinkLogicalAggregate` add 
`partialFinalType` to indicate whether to skip `SplitAggregateRule`. (How to 
remove `partialFinalType` from `FlinkLogicalAggregate` is another question.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test

2019-04-12 Thread GitBox
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add 
stability test
URL: https://github.com/apache/flink/pull/7986#discussion_r274479508
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java
 ##
 @@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.compatibility;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.module.jsonSchema.JsonSchema;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
+
+import org.junit.Assert;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Contains the compatibility checks that are applied by the {@link 
RestAPIStabilityTest}. New checks must be added to
+ * the {@link CompatibilityRoutines#ROUTINES} collection.
+ */
+enum CompatibilityRoutines {
+   ;
+
+   private static final CompatibilityRoutine URL_ROUTINE = new 
CompatibilityRoutine<>(
+   "url",
+   String.class,
+   RestHandlerSpecification::getTargetRestEndpointURL,
+   Assert::assertEquals);
+
+   private static final CompatibilityRoutine METHOD_ROUTINE = new 
CompatibilityRoutine<>(
+   "method",
+   String.class,
+   header -> header.getHttpMethod().getNettyHttpMethod().name(),
+   Assert::assertEquals);
+
+   private static final CompatibilityRoutine STATUS_CODE_ROUTINE = 
new CompatibilityRoutine<>(
+   "status-code",
+   String.class,
+   header -> header.getResponseStatusCode().toString(),
+   Assert::assertEquals);
+
+   private static final CompatibilityRoutine FILE_UPLOAD_ROUTINE 
= new CompatibilityRoutine<>(
+   "file-upload",
+   Boolean.class,
+   UntypedResponseMessageHeaders::acceptsFileUploads,
+   Assert::assertEquals);
+
+   private static final CompatibilityRoutine 
PATH_PARAMETER_ROUTINE = new CompatibilityRoutine<>(
+   "path-parameters",
+   PathParameterContainer.class,
+   header -> {
+   List 
pathParameters = 
header.getUnresolvedMessageParameters().getPathParameters().stream()
+   .map(param -> new 
PathParameterContainer.PathParameter(param.getKey()))
+   .collect(Collectors.toList());
+   return new PathParameterContainer(pathParameters);
+   },
+   CompatibilityRoutines::assertCompatible);
+
+   private static final CompatibilityRoutine 
QUERY_PARAMETER_ROUTINE = new CompatibilityRoutine<>(
+   "query-parameters",
+   QueryParameterContainer.class,
+   header -> {
+   List 
pathParameters = 
header.getUnresolvedMessageParameters().getQueryParameters().stream()
+   .map(param -> new 
QueryParameterContainer.QueryParameter(param.getKey(), param.isMandatory()))
+   .collect(Collectors.toList());
+   return new QueryParameterContainer(pathParameters);
+   },
+   CompatibilityRoutines::assertCompatible);
+
+   private static final CompatibilityRoutine REQUEST_ROUTINE = 
new CompatibilityRoutine<>(
+   "request",
+   JsonNode.class,
+   header -> 

[GitHub] [flink] GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add stability test

2019-04-12 Thread GitBox
GJL commented on a change in pull request #7986: [FLINK-10517][rest] Add 
stability test
URL: https://github.com/apache/flink/pull/7986#discussion_r274863253
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
 ##
 @@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.util;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ConfigurationException;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * Utility class to extract the {@link MessageHeaders} that the {@link 
DispatcherRestEndpoint} supports.
+ */
+public class DocumentingDispatcherRestEndpoint extends DispatcherRestEndpoint 
implements DocumentingRestEndpoint {
+
+   private static final Configuration config;
+   private static final RestServerEndpointConfiguration restConfig;
+   private static final RestHandlerConfiguration handlerConfig;
+   private static final GatewayRetriever 
dispatcherGatewayRetriever;
+   private static final GatewayRetriever 
resourceManagerGatewayRetriever;
+
+   static {
+   config = new Configuration();
+   config.setString(RestOptions.ADDRESS, "localhost");
+   // necessary for loading the web-submission extension
+   config.setString(JobManagerOptions.ADDRESS, "localhost");
+   try {
+   restConfig = 
RestServerEndpointConfiguration.fromConfiguration(config);
+   } catch (ConfigurationException e) {
+   throw new RuntimeException("Implementation error. 
RestServerEndpointConfiguration#fromConfiguration failed for default 
configuration.");
 
 Review comment:
   Wrap the `ConfigurationException` inside the `RuntimeException`? That is,
   
   ```
   throw new RuntimeException("...", e)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10984) Move flink-shaded-hadoop to flink-shaded

2019-04-12 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816167#comment-16816167
 ] 

Chesnay Schepler commented on FLINK-10984:
--

{{flink-shaded-hadoop2}} moved to flink-shaded in 
714c6fdde273579835c4c74389dc7cbbbd041e0c

> Move flink-shaded-hadoop to flink-shaded
> 
>
> Key: FLINK-10984
> URL: https://issues.apache.org/jira/browse/FLINK-10984
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, BuildSystem / Shaded
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> To allow reasonable dependency management we should move flink-shaded-hadoop 
> to flink-shaded, with each supported version having it's own module and 
> dependency management.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10984) Move flink-shaded-hadoop to flink-shaded

2019-04-12 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-10984:
-
Fix Version/s: shaded-7.0

> Move flink-shaded-hadoop to flink-shaded
> 
>
> Key: FLINK-10984
> URL: https://issues.apache.org/jira/browse/FLINK-10984
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, BuildSystem / Shaded
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: shaded-7.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> To allow reasonable dependency management we should move flink-shaded-hadoop 
> to flink-shaded, with each supported version having it's own module and 
> dependency management.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-12 Thread GitBox
KurtYoung commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r274846133
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala
 ##
 @@ -73,4 +94,91 @@ class StreamExecDeduplicate(
   .item("order", orderString)
   }
 
+  //~ ExecNode methods 
---
+
+  override protected def translateToPlanInternal(
+  tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
+
+val inputIsAccRetract = StreamExecRetractionRules.isAccRetract(getInput)
+
+if (inputIsAccRetract) {
+  throw new TableException(
+"Deduplicate: Retraction on Deduplicate is not supported yet.\n" +
+  "please re-check sql grammar. \n" +
+  "Note: Deduplicate should not follow a non-windowed GroupBy 
aggregation.")
+}
+
+val inputTransform = getInputNodes.get(0).translateToPlan(tableEnv)
+  .asInstanceOf[StreamTransformation[BaseRow]]
+
+val rowTypeInfo = 
inputTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]
+
+val generateRetraction = StreamExecRetractionRules.isAccRetract(this)
+
+val inputRowType = FlinkTypeFactory.toInternalRowType(getInput.getRowType)
+val rowTimeFieldIndex = inputRowType.getFieldTypes.zipWithIndex
+  .filter(e => isRowTime(e._1))
+  .map(_._2)
+if (rowTimeFieldIndex.size > 1) {
+  throw new RuntimeException("More than one row time field. Currently this 
is not supported!")
 
 Review comment:
   If we don't support order by rowtime now, we can just remove all rowtime 
related stuffs in this operator, .e.g `isRowtime` in constructor. And make sure 
only order by proctime can be translated to this in rule.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-12 Thread GitBox
KurtYoung commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r274851278
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunctionHelper.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.deduplicate;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.generated.RecordEqualiser;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Utility for deduplicate function.
+ */
+class DeduplicateFunctionHelper {
+
+   static void processLastRow(BaseRow preRow, BaseRow currentRow, boolean 
generateRetraction,
+   boolean stateCleaningEnabled, ValueState 
pkRow, RecordEqualiser equaliser,
+   Collector out) throws Exception {
+   // should be accumulate msg.
+   
Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow));
+   if (!stateCleaningEnabled && preRow != null &&
+   equaliser.equalsWithoutHeader(preRow, 
currentRow)) {
+   // If state cleaning is not enabled, don't emit 
retraction and acc message. But if state cleaning is
+   // enabled, we have to emit message to prevent too 
early state eviction of downstream operators.
+   return;
+   }
+   pkRow.update(currentRow);
+   if (preRow != null && generateRetraction) {
+   preRow.setHeader(BaseRowUtil.RETRACT_MSG);
+   out.collect(preRow);
+   }
+   out.collect(currentRow);
+   }
+
+   static void processFirstRow(BaseRow preRow, BaseRow currentRow, 
ValueState pkRow,
 
 Review comment:
   add some comments to explain what are these rows


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-12 Thread GitBox
KurtYoung commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r274849522
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunction.java
 ##
 @@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.deduplicate;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.generated.RecordEqualiser;
+import 
org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+import static 
org.apache.flink.table.runtime.deduplicate.DeduplicateFunctionHelper.processFirstRow;
+import static 
org.apache.flink.table.runtime.deduplicate.DeduplicateFunctionHelper.processLastRow;
+
+/**
+ * This function is used to deduplicate on keys and keeps only first row or 
last row.
+ */
+public class DeduplicateFunction
+   extends KeyedProcessFunctionWithCleanupState {
+
+   private static final long serialVersionUID = 4950071982706870944L;
+
+   private final BaseRowTypeInfo rowTypeInfo;
+   private final boolean generateRetraction;
+   private final boolean keepLastRow;
+   private ValueState pkRow;
+   private GeneratedRecordEqualiser generatedEqualiser;
+   private transient RecordEqualiser equaliser;
+
+   public DeduplicateFunction(long minRetentionTime, long 
maxRetentionTime, BaseRowTypeInfo rowTypeInfo,
+   boolean generateRetraction, boolean keepLastRow, 
GeneratedRecordEqualiser generatedEqualiser) {
+   super(minRetentionTime, maxRetentionTime);
+   this.rowTypeInfo = rowTypeInfo;
+   this.generateRetraction = generateRetraction;
+   this.keepLastRow = keepLastRow;
+   this.generatedEqualiser = generatedEqualiser;
+   }
+
+   @Override
+   public void open(Configuration configure) throws Exception {
+   super.open(configure);
+   String stateName = keepLastRow ? 
"DeduplicateFunctionCleanupTime" : "DeduplicateFunctionCleanupTime";
+   initCleanupTimeState(stateName);
+   ValueStateDescriptor rowStateDesc = new 
ValueStateDescriptor("rowState", rowTypeInfo);
 
 Review comment:
   in case of `firstRow`, only pk is needed for state, we don't have to store 
the whole row.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-12 Thread GitBox
KurtYoung commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r274847069
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecExchange.scala
 ##
 @@ -32,7 +44,10 @@ class StreamExecExchange(
 relNode: RelNode,
 relDistribution: RelDistribution)
   extends CommonPhysicalExchange(cluster, traitSet, relNode, relDistribution)
-  with StreamPhysicalRel {
+  with StreamPhysicalRel
+  with StreamExecNode[BaseRow] {
+
+  private val DEFAULT_MAX_PARALLELISM = 1 << 7
 
 Review comment:
   too small?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-12 Thread GitBox
KurtYoung commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r274844341
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala
 ##
 @@ -38,7 +58,8 @@ class StreamExecDeduplicate(
 isRowtime: Boolean,
 keepLastRow: Boolean)
   extends SingleRel(cluster, traitSet, inputRel)
-  with StreamPhysicalRel {
+with StreamPhysicalRel
 
 Review comment:
   align with `extends`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-12 Thread GitBox
KurtYoung commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r274848674
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunction.java
 ##
 @@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.deduplicate;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.generated.RecordEqualiser;
+import 
org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+import static 
org.apache.flink.table.runtime.deduplicate.DeduplicateFunctionHelper.processFirstRow;
+import static 
org.apache.flink.table.runtime.deduplicate.DeduplicateFunctionHelper.processLastRow;
+
+/**
+ * This function is used to deduplicate on keys and keeps only first row or 
last row.
+ */
+public class DeduplicateFunction
+   extends KeyedProcessFunctionWithCleanupState {
+
+   private static final long serialVersionUID = 4950071982706870944L;
+
+   private final BaseRowTypeInfo rowTypeInfo;
+   private final boolean generateRetraction;
+   private final boolean keepLastRow;
+   private ValueState pkRow;
+   private GeneratedRecordEqualiser generatedEqualiser;
+   private transient RecordEqualiser equaliser;
+
+   public DeduplicateFunction(long minRetentionTime, long 
maxRetentionTime, BaseRowTypeInfo rowTypeInfo,
+   boolean generateRetraction, boolean keepLastRow, 
GeneratedRecordEqualiser generatedEqualiser) {
+   super(minRetentionTime, maxRetentionTime);
+   this.rowTypeInfo = rowTypeInfo;
+   this.generateRetraction = generateRetraction;
+   this.keepLastRow = keepLastRow;
+   this.generatedEqualiser = generatedEqualiser;
+   }
+
+   @Override
+   public void open(Configuration configure) throws Exception {
+   super.open(configure);
+   String stateName = keepLastRow ? 
"DeduplicateFunctionCleanupTime" : "DeduplicateFunctionCleanupTime";
 
 Review comment:
   the two strings are the same


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8158: [FLINK-12131][runtime] Adjust IntermediateResult/IntermediateResultPartition status properly…

2019-04-12 Thread GitBox
flinkbot commented on issue #8158: [FLINK-12131][runtime] Adjust 
IntermediateResult/IntermediateResultPartition status properly…
URL: https://github.com/apache/flink/pull/8158#issuecomment-482516088
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12131) Resetting ExecutionVertex in region failover may cause inconsistency of IntermediateResult status

2019-04-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12131:
---
Labels: pull-request-available  (was: )

> Resetting ExecutionVertex in region failover may cause inconsistency of 
> IntermediateResult status
> -
>
> Key: FLINK-12131
> URL: https://issues.apache.org/jira/browse/FLINK-12131
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
>
> Two status may not be correct with region failover and current reset logic.
>  # *numberOfRunningProducers* in *IntermediateResult*.
>  # *hasDataProduced* in *IntermediateResultPartition*.
> This is because currently only when the *ExecutionJobVertex* is reset will 
> the related *IntermediateResult*(and the inner *IntermediateResultPartition*) 
> get reset. But region failover only resets the affected 
> *ExecutionVertex*(es),  rather than the entire *ExecutionJobVertex*, leaving 
> the status listed above in an inconsistent state.
> Problems below may occur as a result:
>  # when a FINISHED vertex is restarted and finishes again, the 
> *IntermediateResult.numberOfRunningProducers* may drop below 0 and throws 
> exception to trigger global failover
>  # the *IntermediateResult.numberOfRunningProducers* can be smaller than 
> fact, letting the downstream vertices scheduled earlier than expected
>  # the *IntermediateResultPartition* is reset and not started yet but the 
> *hasDataProduced* remains true
> That's why I'd propose we add IntermediateResult status adjust logic to 
> *ExecutionVertex.**resetForNewExecution()***.
> Detailed design: 
> [https://docs.google.com/document/d/1YA3k8rwDEv1UdaV9NwoDmwc-XorG__JUXlpyJtDs4Ss/edit?usp=sharing]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhuzhurk opened a new pull request #8158: [FLINK-12131][runtime] Adjust IntermediateResult/IntermediateResultPartition status properly…

2019-04-12 Thread GitBox
zhuzhurk opened a new pull request #8158: [FLINK-12131][runtime] Adjust 
IntermediateResult/IntermediateResultPartition status properly…
URL: https://github.com/apache/flink/pull/8158
 
 
   … on ExecutionVertex reset
   
   ## What is the purpose of the change
   
   *This pull request fixes issue that 
IntermediateResult/IntermediateResultPartition status is not properly set 
region failover, which may lead to unexpected scheduling behaviors and global 
failovers. More details can be found in the JIRA.*
   
   
   ## Brief change log
   
 - *The ExecutionJobVertex does not reset IntermediateResult directly, but 
asks its ExecutionVertex(es) to reset their IntermediateResultPartitions*
 - *Resetting a BLOCKING IntermediateResultPartitions increases the 
numberOfRunningProducers of IntermediateResult if the partition is finished*
   
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   
 - *Refined existing cases in IntermediateResultPartitionTest to verify the 
resetting is working as expected*
 - *Added one test case in FailoverRegionTest to verify that the status are 
correct after region failovers*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8157: [FLINK-12170] [table-planner-blink] Add support for generating optimized logical plan for Over aggregate

2019-04-12 Thread GitBox
flinkbot commented on issue #8157: [FLINK-12170] [table-planner-blink] Add 
support for generating optimized logical plan for Over aggregate
URL: https://github.com/apache/flink/pull/8157#issuecomment-482512574
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >