[jira] [Assigned] (FLINK-9416) Make job submission retriable operation in case of a ongoing leader election

2018-05-24 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-9416:


Assignee: Till Rohrmann

> Make job submission retriable operation in case of a ongoing leader election
> 
>
> Key: FLINK-9416
> URL: https://issues.apache.org/jira/browse/FLINK-9416
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> When starting a session cluster, it can happen that the job submission fails 
> if the REST server endpoint has already gained leadership but if the 
> leadership election for the {{Dispatcher}} is still ongoing. In such a case, 
> we receive a error response saying that the leader election is still ongoing 
> and fail the job submission. I think it would be nicer to also make the 
> submission step a retriable operation in order to avoid this race condition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9421) RunningJobsRegistry entries are not cleaned up after job termination

2018-05-24 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-9421:


Assignee: Till Rohrmann

> RunningJobsRegistry entries are not cleaned up after job termination
> 
>
> Key: FLINK-9421
> URL: https://issues.apache.org/jira/browse/FLINK-9421
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>
> Currently, the {{Dispatcher}} does not clean up the {{RunningJobsRegistry}} 
> after the job has finished. The consequence is that a ZNode with the JobID 
> and a state num per job remains in ZooKeeper.
> We should clean up these ZNodes to avoid a resource leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9429) Quickstart E2E not working locally

2018-05-24 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488586#comment-16488586
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-9429:


The test also works for me locally.
Could it be some bash command used in the test script isn't portable across 
different environments?

Though, we should still fix the fact that the test does not terminate properly 
with control-C.
I think the problem is with the {{while : ;}} loop in {{verify_result}} - 
control-C only interrupts commands within the loop but does not escape it.

> Quickstart E2E not working locally
> --
>
> Key: FLINK-9429
> URL: https://issues.apache.org/jira/browse/FLINK-9429
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Critical
>  Labels: test-stability
>
> The quickstart e2e test is not working locally. It seems as if the job does 
> not produce anything into Elasticsearch. Furthermore, the test does not 
> terminate with control-C.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9234) Commons Logging is missing from shaded Flink Table library

2018-05-24 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488617#comment-16488617
 ] 

Timo Walther commented on FLINK-9234:
-

It seems that this change has impacts on other modules. The 
{{HBaseConnectorITCase}} fails on my machine. The question is why did Travis 
not catch that [~Zentol]?

{code}
Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 1.209 sec <<< 
FAILURE! - in org.apache.flink.addons.hbase.HBaseConnectorITCase
org.apache.flink.addons.hbase.HBaseConnectorITCase  Time elapsed: 1.208 sec  
<<< ERROR!
java.lang.ClassCastException: 
org.apache.commons.logging.impl.SLF4JLocationAwareLog cannot be cast to 
org.apache.commons.logging.impl.Log4JLogger

org.apache.flink.addons.hbase.HBaseConnectorITCase  Time elapsed: 1.209 sec  
<<< ERROR!
java.lang.NullPointerException
{code}

> Commons Logging is missing from shaded Flink Table library
> --
>
> Key: FLINK-9234
> URL: https://issues.apache.org/jira/browse/FLINK-9234
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.2
> Environment: jdk1.8.0_172
> flink 1.4.2
> Mac High Sierra
>Reporter: Eron Wright 
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
> Attachments: repro.scala
>
>
> The flink-table shaded library seems to be missing some classes from 
> {{org.apache.commons.logging}} that are required by 
> {{org.apache.commons.configuration}}.  Ran into the problem while using the 
> external catalog support, on Flink 1.4.2.
> See attached a repro, which produces:
> {code}
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/flink/table/shaded/org/apache/commons/logging/Log
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.parseScanPackagesFromConfigFile(ExternalTableSourceUtil.scala:153)
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.(ExternalTableSourceUtil.scala:55)
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.(ExternalTableSourceUtil.scala)
>   at 
> org.apache.flink.table.catalog.ExternalCatalogSchema.getTable(ExternalCatalogSchema.scala:78)
>   at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:82)
>   at 
> org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:256)
>   at 
> org.apache.calcite.jdbc.CalciteSchema$SchemaPlusImpl.getTable(CalciteSchema.java:561)
>   at 
> org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:497)
>   at 
> org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:485)
>   at Repro$.main(repro.scala:17)
>   at Repro.main(repro.scala)
> {code}
> Dependencies:
> {code}
> compile 'org.slf4j:slf4j-api:1.7.25'
> compile 'org.slf4j:slf4j-log4j12:1.7.25'
> runtime 'log4j:log4j:1.2.17'
> compile 'org.apache.flink:flink-scala_2.11:1.4.2'
> compile 'org.apache.flink:flink-streaming-scala_2.11:1.4.2'
> compile 'org.apache.flink:flink-clients_2.11:1.4.2'
> compile 'org.apache.flink:flink-table_2.11:1.4.2'
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6067: [FLINK-9427] Fix registration and request slot rac...

2018-05-24 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/6067

[FLINK-9427] Fix registration and request slot race condition in 
TaskExecutor

## What is the purpose of the change

This commit fixes a race condition between the TaskExecutor and the 
ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before 
the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all 
information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration 
time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. 
With this commit, the
registration protocol changes such that the TaskExecutor first registers at 
the ResourceManager
and only after completing this step, it will announce the available slots 
to the SlotManager.

cc @GJL 

## Brief change log

- Changed the `TaskExecutor` `ResourceManager` registration protocol to 
announce the available slots after the completion of the registration
- Hardened the `TaskExecutor#requestSlot` to only accept the call if there 
is an established connection to a `ResourceManager`

## Verifying this change

- Added `SlotManagerTest#testSlotRequestFailure`
- Added `TaskExecutorTest#testIgnoringSlotRequestsIfNotRegistered`, 
`testReconnectionAttemptIfExplicitlyDisconnected`, `testInitialSlotReport` and 
`testInitialSlotReportFailure`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (n)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixTaskExecutorRegistration

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6067.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6067


commit 4b4d82cc5a3bb9694fd19a37c21345cbe7928962
Author: Till Rohrmann 
Date:   2018-05-23T16:50:27Z

[FLINK-9427] Fix registration and request slot race condition in 
TaskExecutor

This commit fixes a race condition between the TaskExecutor and the 
ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before 
the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all 
information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration 
time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. 
With this commit, the
registration protocol changes such that the TaskExecutor first registers at 
the ResourceManager
and only after completing this step, it will announce the available slots 
to the SlotManager.




---


[jira] [Commented] (FLINK-9427) Cannot download from BlobServer, because the server address is unknown.

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488622#comment-16488622
 ] 

ASF GitHub Bot commented on FLINK-9427:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/6067

[FLINK-9427] Fix registration and request slot race condition in 
TaskExecutor

## What is the purpose of the change

This commit fixes a race condition between the TaskExecutor and the 
ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before 
the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all 
information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration 
time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. 
With this commit, the
registration protocol changes such that the TaskExecutor first registers at 
the ResourceManager
and only after completing this step, it will announce the available slots 
to the SlotManager.

cc @GJL 

## Brief change log

- Changed the `TaskExecutor` `ResourceManager` registration protocol to 
announce the available slots after the completion of the registration
- Hardened the `TaskExecutor#requestSlot` to only accept the call if there 
is an established connection to a `ResourceManager`

## Verifying this change

- Added `SlotManagerTest#testSlotRequestFailure`
- Added `TaskExecutorTest#testIgnoringSlotRequestsIfNotRegistered`, 
`testReconnectionAttemptIfExplicitlyDisconnected`, `testInitialSlotReport` and 
`testInitialSlotReportFailure`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (n)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixTaskExecutorRegistration

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6067.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6067


commit 4b4d82cc5a3bb9694fd19a37c21345cbe7928962
Author: Till Rohrmann 
Date:   2018-05-23T16:50:27Z

[FLINK-9427] Fix registration and request slot race condition in 
TaskExecutor

This commit fixes a race condition between the TaskExecutor and the 
ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before 
the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all 
information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration 
time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. 
With this commit, the
registration protocol changes such that the TaskExecutor first registers at 
the ResourceManager
and only after completing this step, it will announce the available slots 
to the SlotManager.




> Cannot download from BlobServer, because the server address is unknown.
> ---
>
> Key: FLINK-9427
> URL: https://issues.apache.org/jira/browse/FLINK-9427
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Piotr Nowojski
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: failure
>
>
> Setup: 6 + 1 nodes EMR cluster with m4.4xlarge instances
> Job submission fails in most cases (but not all of them):
> {noformat}
> [hadoop@ip-172-31-28-17 flink-1.5.0]$ HADOOP_CONF_DIR=/etc/hadoop/conf 
> ./bin/flink run -m yarn-cluster -p 80 -yn 80 examples/batch/WordCount.jar 
> --input hdfs:///user/hadoop/enwiki-latest-abstract.xml --output 
> hdfs:///user/hadoop/output
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/hadoop/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Foun

[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...

2018-05-24 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/6068

 [FLINK-9421] Remove job from RunningJobsRegistry when it reaches a 
terminal state

## What is the purpose of the change

This commit lets the Dispatcher remove the RunningJobsRegistry entry for a 
completed job
when it is removed from the Dispatcher.

This PR is based on #6067 

cc @GJL

## Verifying this change

- Added `DispatcherResourceCleanupTest#testRunningJobsRegistryCleanup`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink runningJobsRegistryCleanup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6068.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6068


commit 4b4d82cc5a3bb9694fd19a37c21345cbe7928962
Author: Till Rohrmann 
Date:   2018-05-23T16:50:27Z

[FLINK-9427] Fix registration and request slot race condition in 
TaskExecutor

This commit fixes a race condition between the TaskExecutor and the 
ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before 
the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all 
information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration 
time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. 
With this commit, the
registration protocol changes such that the TaskExecutor first registers at 
the ResourceManager
and only after completing this step, it will announce the available slots 
to the SlotManager.

commit 4d034edca41294e250c49807a3beecb2b419824d
Author: Till Rohrmann 
Date:   2018-05-23T21:48:38Z

[FLINK-9421] Remove job from RunningJobsRegistry when it reaches a terminal 
state

This commit lets the Dispatcher remove the RunningJobsRegistry entry for a 
completed job
when it is removed from the Dispatcher.




---


[jira] [Commented] (FLINK-9421) RunningJobsRegistry entries are not cleaned up after job termination

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488625#comment-16488625
 ] 

ASF GitHub Bot commented on FLINK-9421:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/6068

 [FLINK-9421] Remove job from RunningJobsRegistry when it reaches a 
terminal state

## What is the purpose of the change

This commit lets the Dispatcher remove the RunningJobsRegistry entry for a 
completed job
when it is removed from the Dispatcher.

This PR is based on #6067 

cc @GJL

## Verifying this change

- Added `DispatcherResourceCleanupTest#testRunningJobsRegistryCleanup`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink runningJobsRegistryCleanup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6068.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6068


commit 4b4d82cc5a3bb9694fd19a37c21345cbe7928962
Author: Till Rohrmann 
Date:   2018-05-23T16:50:27Z

[FLINK-9427] Fix registration and request slot race condition in 
TaskExecutor

This commit fixes a race condition between the TaskExecutor and the 
ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before 
the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all 
information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration 
time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. 
With this commit, the
registration protocol changes such that the TaskExecutor first registers at 
the ResourceManager
and only after completing this step, it will announce the available slots 
to the SlotManager.

commit 4d034edca41294e250c49807a3beecb2b419824d
Author: Till Rohrmann 
Date:   2018-05-23T21:48:38Z

[FLINK-9421] Remove job from RunningJobsRegistry when it reaches a terminal 
state

This commit lets the Dispatcher remove the RunningJobsRegistry entry for a 
completed job
when it is removed from the Dispatcher.




> RunningJobsRegistry entries are not cleaned up after job termination
> 
>
> Key: FLINK-9421
> URL: https://issues.apache.org/jira/browse/FLINK-9421
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>
> Currently, the {{Dispatcher}} does not clean up the {{RunningJobsRegistry}} 
> after the job has finished. The consequence is that a ZNode with the JobID 
> and a state num per job remains in ZooKeeper.
> We should clean up these ZNodes to avoid a resource leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6069: [FLINK-9416] Make all RestClusterClient calls retr...

2018-05-24 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/6069

[FLINK-9416] Make all RestClusterClient calls retriable

## What is the purpose of the change

This commit changes the RestClusterClient calls such that they are all 
retriable wrt
to connection errors and if the service is currently unavailable (return 
code 503).

Moreover, it changes the retry behaviour for polling the JobResult such 
that it fails
now if the cluster returns a NOT_FOUND code.

This PR is based on #6068.

cc @GJL 

## Verifying this change

- Added 
`RestClusterClientTest#testRetriableSendOperationIfConnectionErrorOrServiceUnavailable`
 and `testSendIsNotRetriableIfHttpNotFound`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink hardenRestClient

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6069.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6069


commit 4b4d82cc5a3bb9694fd19a37c21345cbe7928962
Author: Till Rohrmann 
Date:   2018-05-23T16:50:27Z

[FLINK-9427] Fix registration and request slot race condition in 
TaskExecutor

This commit fixes a race condition between the TaskExecutor and the 
ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before 
the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all 
information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration 
time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. 
With this commit, the
registration protocol changes such that the TaskExecutor first registers at 
the ResourceManager
and only after completing this step, it will announce the available slots 
to the SlotManager.

commit 4d034edca41294e250c49807a3beecb2b419824d
Author: Till Rohrmann 
Date:   2018-05-23T21:48:38Z

[FLINK-9421] Remove job from RunningJobsRegistry when it reaches a terminal 
state

This commit lets the Dispatcher remove the RunningJobsRegistry entry for a 
completed job
when it is removed from the Dispatcher.

commit 61817beea5dfa1fdf20dcd6266b5899769307f6b
Author: Till Rohrmann 
Date:   2018-05-24T08:01:23Z

[FLINK-9416] Make all RestClusterClient calls retriable

This commit changes the RestClusterClient calls such that they are all 
retriable wrt
to connection errors and if the service is currently unavailable (return 
code 503).

Moreover, it changes the retry behaviour for polling the JobResult such 
that it fails
now if the cluster returns a NOT_FOUND code.




---


[jira] [Commented] (FLINK-9416) Make job submission retriable operation in case of a ongoing leader election

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488629#comment-16488629
 ] 

ASF GitHub Bot commented on FLINK-9416:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/6069

[FLINK-9416] Make all RestClusterClient calls retriable

## What is the purpose of the change

This commit changes the RestClusterClient calls such that they are all 
retriable wrt
to connection errors and if the service is currently unavailable (return 
code 503).

Moreover, it changes the retry behaviour for polling the JobResult such 
that it fails
now if the cluster returns a NOT_FOUND code.

This PR is based on #6068.

cc @GJL 

## Verifying this change

- Added 
`RestClusterClientTest#testRetriableSendOperationIfConnectionErrorOrServiceUnavailable`
 and `testSendIsNotRetriableIfHttpNotFound`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink hardenRestClient

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6069.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6069


commit 4b4d82cc5a3bb9694fd19a37c21345cbe7928962
Author: Till Rohrmann 
Date:   2018-05-23T16:50:27Z

[FLINK-9427] Fix registration and request slot race condition in 
TaskExecutor

This commit fixes a race condition between the TaskExecutor and the 
ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before 
the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all 
information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration 
time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. 
With this commit, the
registration protocol changes such that the TaskExecutor first registers at 
the ResourceManager
and only after completing this step, it will announce the available slots 
to the SlotManager.

commit 4d034edca41294e250c49807a3beecb2b419824d
Author: Till Rohrmann 
Date:   2018-05-23T21:48:38Z

[FLINK-9421] Remove job from RunningJobsRegistry when it reaches a terminal 
state

This commit lets the Dispatcher remove the RunningJobsRegistry entry for a 
completed job
when it is removed from the Dispatcher.

commit 61817beea5dfa1fdf20dcd6266b5899769307f6b
Author: Till Rohrmann 
Date:   2018-05-24T08:01:23Z

[FLINK-9416] Make all RestClusterClient calls retriable

This commit changes the RestClusterClient calls such that they are all 
retriable wrt
to connection errors and if the service is currently unavailable (return 
code 503).

Moreover, it changes the retry behaviour for polling the JobResult such 
that it fails
now if the cluster returns a NOT_FOUND code.




> Make job submission retriable operation in case of a ongoing leader election
> 
>
> Key: FLINK-9416
> URL: https://issues.apache.org/jira/browse/FLINK-9416
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> When starting a session cluster, it can happen that the job submission fails 
> if the REST server endpoint has already gained leadership but if the 
> leadership election for the {{Dispatcher}} is still ongoing. In such a case, 
> we receive a error response saying that the leader election is still ongoing 
> and fail the job submission. I think it would be nicer to also make the 
> submission step a retriable operation in order to avoid this race condition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9234) Commons Logging is missing from shaded Flink Table library

2018-05-24 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488654#comment-16488654
 ] 

Timo Walther commented on FLINK-9234:
-

I'm thinking about reverting this change. It pulls in different logging classes 
for all dependencies that use {{flink-table}}. This problem will be fixed in 
1.6 anyway by FLINK-8511 which I might merge today. Users of the external 
catalogs can still add the dependency themselves. What do you think [~fhueske]?

> Commons Logging is missing from shaded Flink Table library
> --
>
> Key: FLINK-9234
> URL: https://issues.apache.org/jira/browse/FLINK-9234
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.2
> Environment: jdk1.8.0_172
> flink 1.4.2
> Mac High Sierra
>Reporter: Eron Wright 
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
> Attachments: repro.scala
>
>
> The flink-table shaded library seems to be missing some classes from 
> {{org.apache.commons.logging}} that are required by 
> {{org.apache.commons.configuration}}.  Ran into the problem while using the 
> external catalog support, on Flink 1.4.2.
> See attached a repro, which produces:
> {code}
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/flink/table/shaded/org/apache/commons/logging/Log
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.parseScanPackagesFromConfigFile(ExternalTableSourceUtil.scala:153)
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.(ExternalTableSourceUtil.scala:55)
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.(ExternalTableSourceUtil.scala)
>   at 
> org.apache.flink.table.catalog.ExternalCatalogSchema.getTable(ExternalCatalogSchema.scala:78)
>   at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:82)
>   at 
> org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:256)
>   at 
> org.apache.calcite.jdbc.CalciteSchema$SchemaPlusImpl.getTable(CalciteSchema.java:561)
>   at 
> org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:497)
>   at 
> org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:485)
>   at Repro$.main(repro.scala:17)
>   at Repro.main(repro.scala)
> {code}
> Dependencies:
> {code}
> compile 'org.slf4j:slf4j-api:1.7.25'
> compile 'org.slf4j:slf4j-log4j12:1.7.25'
> runtime 'log4j:log4j:1.2.17'
> compile 'org.apache.flink:flink-scala_2.11:1.4.2'
> compile 'org.apache.flink:flink-streaming-scala_2.11:1.4.2'
> compile 'org.apache.flink:flink-clients_2.11:1.4.2'
> compile 'org.apache.flink:flink-table_2.11:1.4.2'
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5869: [FLINK-8946] TaskManager stop sending metrics afte...

2018-05-24 Thread yanghua
Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/5869


---


[GitHub] flink issue #5869: [FLINK-8946] TaskManager stop sending metrics after JobMa...

2018-05-24 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5869
  
@zentol please see new PR #6060 , closing this PR...


---


[jira] [Commented] (FLINK-8946) TaskManager stop sending metrics after JobManager failover

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488657#comment-16488657
 ] 

ASF GitHub Bot commented on FLINK-8946:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5869
  
@zentol please see new PR #6060 , closing this PR...


> TaskManager stop sending metrics after JobManager failover
> --
>
> Key: FLINK-8946
> URL: https://issues.apache.org/jira/browse/FLINK-8946
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, TaskManager
>Affects Versions: 1.4.2
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Critical
> Fix For: 1.5.0
>
>
> Running in Yarn-standalone mode, when the Job Manager performs a failover, 
> all TaskManager that are inherited from the previous JobManager will not send 
> metrics to the new JobManager and other registered metric reporters.
>  
> A cursory glance reveal that these line of code might be the cause
> [https://github.com/apache/flink/blob/a3478fdfa0f792104123fefbd9bdf01f5029de51/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1082-L1086]
> Perhap the TaskManager close its metrics group when disassociating 
> JobManager, but not creating a new one on fail-over association ?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8946) TaskManager stop sending metrics after JobManager failover

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488658#comment-16488658
 ] 

ASF GitHub Bot commented on FLINK-8946:
---

Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/5869


> TaskManager stop sending metrics after JobManager failover
> --
>
> Key: FLINK-8946
> URL: https://issues.apache.org/jira/browse/FLINK-8946
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, TaskManager
>Affects Versions: 1.4.2
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Critical
> Fix For: 1.5.0
>
>
> Running in Yarn-standalone mode, when the Job Manager performs a failover, 
> all TaskManager that are inherited from the previous JobManager will not send 
> metrics to the new JobManager and other registered metric reporters.
>  
> A cursory glance reveal that these line of code might be the cause
> [https://github.com/apache/flink/blob/a3478fdfa0f792104123fefbd9bdf01f5029de51/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1082-L1086]
> Perhap the TaskManager close its metrics group when disassociating 
> JobManager, but not creating a new one on fail-over association ?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9418) Migrate SharedBuffer to use MapState

2018-05-24 Thread aitozi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488660#comment-16488660
 ] 

aitozi commented on FLINK-9418:
---

Hi, can you share some detail  about how to reach this "so that only the 
necessary parts (e.g. tail entries) are deserialized"? 

> Migrate SharedBuffer to use MapState
> 
>
> Key: FLINK-9418
> URL: https://issues.apache.org/jira/browse/FLINK-9418
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.6.0
>
>
> Right now {{SharedBuffer}} is implemented with java Collections and the whole 
> buffer is deserialized on each access. We should migrate it to MapState, so 
> that only the necessary parts (e.g. tail entries) are deserialized.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6062: [FLINK-9423][state] Implement efficient deletes for heap-...

2018-05-24 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6062
  
@sihuazhou thanks for the review. Addressed your comments, please take a 
look if you want.


---


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488670#comment-16488670
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6062
  
@sihuazhou thanks for the review. Addressed your comments, please take a 
look if you want.


> Implement efficient deletes for heap based timer service
> 
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-9234) Commons Logging is missing from shaded Flink Table library

2018-05-24 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reopened FLINK-9234:
-

> Commons Logging is missing from shaded Flink Table library
> --
>
> Key: FLINK-9234
> URL: https://issues.apache.org/jira/browse/FLINK-9234
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.2
> Environment: jdk1.8.0_172
> flink 1.4.2
> Mac High Sierra
>Reporter: Eron Wright 
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
> Attachments: repro.scala
>
>
> The flink-table shaded library seems to be missing some classes from 
> {{org.apache.commons.logging}} that are required by 
> {{org.apache.commons.configuration}}.  Ran into the problem while using the 
> external catalog support, on Flink 1.4.2.
> See attached a repro, which produces:
> {code}
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/flink/table/shaded/org/apache/commons/logging/Log
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.parseScanPackagesFromConfigFile(ExternalTableSourceUtil.scala:153)
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.(ExternalTableSourceUtil.scala:55)
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.(ExternalTableSourceUtil.scala)
>   at 
> org.apache.flink.table.catalog.ExternalCatalogSchema.getTable(ExternalCatalogSchema.scala:78)
>   at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:82)
>   at 
> org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:256)
>   at 
> org.apache.calcite.jdbc.CalciteSchema$SchemaPlusImpl.getTable(CalciteSchema.java:561)
>   at 
> org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:497)
>   at 
> org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:485)
>   at Repro$.main(repro.scala:17)
>   at Repro.main(repro.scala)
> {code}
> Dependencies:
> {code}
> compile 'org.slf4j:slf4j-api:1.7.25'
> compile 'org.slf4j:slf4j-log4j12:1.7.25'
> runtime 'log4j:log4j:1.2.17'
> compile 'org.apache.flink:flink-scala_2.11:1.4.2'
> compile 'org.apache.flink:flink-streaming-scala_2.11:1.4.2'
> compile 'org.apache.flink:flink-clients_2.11:1.4.2'
> compile 'org.apache.flink:flink-table_2.11:1.4.2'
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6066: [FLINK-9428] [checkpointing] Allow operators to fl...

2018-05-24 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/6066#discussion_r190510791
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 ---
@@ -93,6 +93,20 @@
//  state snapshots
// 

 
+   /**
+* This method is called when the operator should do a snapshot, before 
it emits its
+* own checkpoint barrier. This method is intended not for any actual 
state persistence,
+* but only for emitting some data before emitting the checkpoint 
barrier.
+*
+* Important: This method should not be used for any actual 
state snapshot logic, because
+* it will inherently be within the synchronous part of the operator's 
checkpoint. If heavy work is done
+* withing this method, it will affect latency and downstream 
checkpoint alignments.
--- End diff --

typo: withing -> within


---


[jira] [Commented] (FLINK-9428) Allow operators to flush data on checkpoint pre-barrier

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488678#comment-16488678
 ] 

ASF GitHub Bot commented on FLINK-9428:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/6066#discussion_r190512162
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
+import 
org.apache.flink.streaming.runtime.operators.StreamOperatorChainingTest;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.BroadcastingOutputCollector;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingOutput;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.WatermarkGaugeExposingOutput;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This class test the {@link OperatorChain}.
+ *
+ * It takes a different (simpler) approach at testing the operator 
chain than
+ * {@link StreamOperatorChainingTest}.
+ */
+public class OperatorChainTest {
+
+   @Test
+   public void testPrepareCheckpointPreBarrier() throws Exception {
+   final AtomicInteger intRef = new AtomicInteger();
+
+   final OneInputStreamOperator one = new 
ValidatingOperator(intRef, 0);
+   final OneInputStreamOperator two = new 
ValidatingOperator(intRef, 1);
+   final OneInputStreamOperator three = new 
ValidatingOperator(intRef, 2);
+
+   final OperatorChain chain = setupOperatorChain(one, two, 
three);
+   
chain.prepareSnapshotPreBarrier(ValidatingOperator.CHECKPOINT_ID);
+
+   assertEquals(3, intRef.get());
+   }
+
+   // 

+   //  Operator Chain Setup Utils
+   // 

+
+   @SafeVarargs
+   private static > OperatorChain 
setupOperatorChain(
--- End diff --

this is maybe a bit much mocking


> Allow operators to flush data on checkpoint pre-barrier
> ---
>
> Key: FLINK-9428
> URL: https://issues.apache.org/jira/browse/FLINK-9428
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Some operators maintain some small transient state that may be inefficient to 
> checkpoint, especially when it would need to be checkpointed also in a 
> re-scalable way.
> An example are opportunistic pre-aggregation operators, which have small the 
> pre-aggregation state that is frequently flushed downstream.
> Rather that persisting that stat

[GitHub] flink pull request #6066: [FLINK-9428] [checkpointing] Allow operators to fl...

2018-05-24 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/6066#discussion_r190512162
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
+import 
org.apache.flink.streaming.runtime.operators.StreamOperatorChainingTest;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.BroadcastingOutputCollector;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingOutput;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.WatermarkGaugeExposingOutput;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This class test the {@link OperatorChain}.
+ *
+ * It takes a different (simpler) approach at testing the operator 
chain than
+ * {@link StreamOperatorChainingTest}.
+ */
+public class OperatorChainTest {
+
+   @Test
+   public void testPrepareCheckpointPreBarrier() throws Exception {
+   final AtomicInteger intRef = new AtomicInteger();
+
+   final OneInputStreamOperator one = new 
ValidatingOperator(intRef, 0);
+   final OneInputStreamOperator two = new 
ValidatingOperator(intRef, 1);
+   final OneInputStreamOperator three = new 
ValidatingOperator(intRef, 2);
+
+   final OperatorChain chain = setupOperatorChain(one, two, 
three);
+   
chain.prepareSnapshotPreBarrier(ValidatingOperator.CHECKPOINT_ID);
+
+   assertEquals(3, intRef.get());
+   }
+
+   // 

+   //  Operator Chain Setup Utils
+   // 

+
+   @SafeVarargs
+   private static > OperatorChain 
setupOperatorChain(
--- End diff --

this is maybe a bit much mocking


---


[jira] [Commented] (FLINK-9428) Allow operators to flush data on checkpoint pre-barrier

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488679#comment-16488679
 ] 

ASF GitHub Bot commented on FLINK-9428:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/6066#discussion_r190510791
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 ---
@@ -93,6 +93,20 @@
//  state snapshots
// 

 
+   /**
+* This method is called when the operator should do a snapshot, before 
it emits its
+* own checkpoint barrier. This method is intended not for any actual 
state persistence,
+* but only for emitting some data before emitting the checkpoint 
barrier.
+*
+* Important: This method should not be used for any actual 
state snapshot logic, because
+* it will inherently be within the synchronous part of the operator's 
checkpoint. If heavy work is done
+* withing this method, it will affect latency and downstream 
checkpoint alignments.
--- End diff --

typo: withing -> within


> Allow operators to flush data on checkpoint pre-barrier
> ---
>
> Key: FLINK-9428
> URL: https://issues.apache.org/jira/browse/FLINK-9428
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Some operators maintain some small transient state that may be inefficient to 
> checkpoint, especially when it would need to be checkpointed also in a 
> re-scalable way.
> An example are opportunistic pre-aggregation operators, which have small the 
> pre-aggregation state that is frequently flushed downstream.
> Rather that persisting that state in a checkpoint, it can make sense to flush 
> the data downstream upon a checkpoint, to let it be part of the downstream 
> operator's state.
> This feature is sensitive, because flushing state has a clean implication on 
> the downstream operator's checkpoint alignment. However, used with care, and 
> with the new back-pressure-based checkpoint alignment, this feature can be 
> very useful.
> Because it is sensitive, I suggest to make this only an internal feature 
> (accessible to operators) and NOT expose it in the public API at this point.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6026: [FLINK-9384]KafkaAvroTableSource failed to work due to ty...

2018-05-24 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6026
  
Thank you @tragicjun. LGTM. Merging...


---


[jira] [Commented] (FLINK-9384) KafkaAvroTableSource failed to work due to type mismatch

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488683#comment-16488683
 ] 

ASF GitHub Bot commented on FLINK-9384:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6026
  
Thank you @tragicjun. LGTM. Merging...


> KafkaAvroTableSource failed to work due to type mismatch
> 
>
> Key: FLINK-9384
> URL: https://issues.apache.org/jira/browse/FLINK-9384
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Priority: Blocker
>  Labels: easyfix, patch
> Fix For: 1.6.0
>
> Attachments: flink-9384.patch
>
>
> An exception was thrown when using KafkaAvroTableSource as follows:
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> TableSource of type 
> org.apache.flink.streaming.connectors.kafka.Kafka011AvroTableSource returned 
> a DataStream of type GenericType that does not 
> match with the type Row(id: Integer, name: String, age: Integer, event: 
> GenericType) declared by the TableSource.getReturnType() 
> method. Please validate the implementation of the TableSource.
>  at 
> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:100)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:279)
>  at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
>  at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
>  at 
> org.apache.flink.quickstart.StreamingJobAvro.main(StreamingJobAvro.java:85)
>  
> It is caused by a discrepancy between the type returned by the TableSource 
> and the type returned by the DataStream. I've already fixed it, would someone 
> please review the patch and see if it could be merged.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9424) BlobClientSslTest does not work in all environments

2018-05-24 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488689#comment-16488689
 ] 

Chesnay Schepler commented on FLINK-9424:
-

This is not an issue with the tests but the default value for the SSL ciphers 
that we recently updated.

> BlobClientSslTest does not work in all environments
> ---
>
> Key: FLINK-9424
> URL: https://issues.apache.org/jira/browse/FLINK-9424
> Project: Flink
>  Issue Type: Test
>  Components: Distributed Coordination, Tests
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Priority: Major
>
> It seems that the {{BlobClientSslTest}} assumes SSL algorithms that are not 
> present in every environment. Thus, they cause the Flink build to fail. It 
> also affects {{NettyClientServerSslTest}}.
> Environment:
> {code}
> Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 
> 2018-02-24T20:49:05+01:00)
> Maven home: /usr/local/Cellar/maven/3.5.3/libexec
> Java version: 1.8.0_102, vendor: Oracle Corporation
> Java home: 
> /Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "mac os x", version: "10.13.3", arch: "x86_64", family: "mac"
> {code}
> Exception:
> {code}
> java.lang.IllegalArgumentException: Cannot support 
> TLS_DHE_RSA_WITH_AES_256_GCM_SHA384 with currently installed providers
>   at sun.security.ssl.CipherSuiteList.(CipherSuiteList.java:92)
>   at 
> sun.security.ssl.SSLServerSocketImpl.setEnabledCipherSuites(SSLServerSocketImpl.java:200)
>   at 
> org.apache.flink.runtime.net.SSLUtils.setSSLVerAndCipherSuites(SSLUtils.java:84)
>   at org.apache.flink.runtime.blob.BlobServer.(BlobServer.java:207)
>   at 
> org.apache.flink.runtime.blob.BlobClientSslTest.startSSLServer(BlobClientSslTest.java:65)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9234) Commons Logging is missing from shaded Flink Table library

2018-05-24 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488694#comment-16488694
 ] 

Chesnay Schepler commented on FLINK-9234:
-

My guess is that this wasn't caught on travis since we're using maven 3.2.5 
there. If you compile all of Flink with maven 3.3+ the logging dependency in 
flink-table is still visible to modules that depend on flink-table, which may 
or may not affect testing.

> Commons Logging is missing from shaded Flink Table library
> --
>
> Key: FLINK-9234
> URL: https://issues.apache.org/jira/browse/FLINK-9234
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.2
> Environment: jdk1.8.0_172
> flink 1.4.2
> Mac High Sierra
>Reporter: Eron Wright 
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
> Attachments: repro.scala
>
>
> The flink-table shaded library seems to be missing some classes from 
> {{org.apache.commons.logging}} that are required by 
> {{org.apache.commons.configuration}}.  Ran into the problem while using the 
> external catalog support, on Flink 1.4.2.
> See attached a repro, which produces:
> {code}
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/flink/table/shaded/org/apache/commons/logging/Log
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.parseScanPackagesFromConfigFile(ExternalTableSourceUtil.scala:153)
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.(ExternalTableSourceUtil.scala:55)
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.(ExternalTableSourceUtil.scala)
>   at 
> org.apache.flink.table.catalog.ExternalCatalogSchema.getTable(ExternalCatalogSchema.scala:78)
>   at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:82)
>   at 
> org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:256)
>   at 
> org.apache.calcite.jdbc.CalciteSchema$SchemaPlusImpl.getTable(CalciteSchema.java:561)
>   at 
> org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:497)
>   at 
> org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:485)
>   at Repro$.main(repro.scala:17)
>   at Repro.main(repro.scala)
> {code}
> Dependencies:
> {code}
> compile 'org.slf4j:slf4j-api:1.7.25'
> compile 'org.slf4j:slf4j-log4j12:1.7.25'
> runtime 'log4j:log4j:1.2.17'
> compile 'org.apache.flink:flink-scala_2.11:1.4.2'
> compile 'org.apache.flink:flink-streaming-scala_2.11:1.4.2'
> compile 'org.apache.flink:flink-clients_2.11:1.4.2'
> compile 'org.apache.flink:flink-table_2.11:1.4.2'
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9424) BlobClientSslTest does not work in all environments

2018-05-24 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488695#comment-16488695
 ] 

Chesnay Schepler commented on FLINK-9424:
-

That said the tests should maybe check whether the ciphers are available.

> BlobClientSslTest does not work in all environments
> ---
>
> Key: FLINK-9424
> URL: https://issues.apache.org/jira/browse/FLINK-9424
> Project: Flink
>  Issue Type: Test
>  Components: Distributed Coordination, Tests
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Priority: Major
>
> It seems that the {{BlobClientSslTest}} assumes SSL algorithms that are not 
> present in every environment. Thus, they cause the Flink build to fail. It 
> also affects {{NettyClientServerSslTest}}.
> Environment:
> {code}
> Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 
> 2018-02-24T20:49:05+01:00)
> Maven home: /usr/local/Cellar/maven/3.5.3/libexec
> Java version: 1.8.0_102, vendor: Oracle Corporation
> Java home: 
> /Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "mac os x", version: "10.13.3", arch: "x86_64", family: "mac"
> {code}
> Exception:
> {code}
> java.lang.IllegalArgumentException: Cannot support 
> TLS_DHE_RSA_WITH_AES_256_GCM_SHA384 with currently installed providers
>   at sun.security.ssl.CipherSuiteList.(CipherSuiteList.java:92)
>   at 
> sun.security.ssl.SSLServerSocketImpl.setEnabledCipherSuites(SSLServerSocketImpl.java:200)
>   at 
> org.apache.flink.runtime.net.SSLUtils.setSSLVerAndCipherSuites(SSLUtils.java:84)
>   at org.apache.flink.runtime.blob.BlobServer.(BlobServer.java:207)
>   at 
> org.apache.flink.runtime.blob.BlobClientSslTest.startSSLServer(BlobClientSslTest.java:65)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6062: [FLINK-9423][state] Implement efficient deletes fo...

2018-05-24 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r190517336
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
 ---
@@ -23,8 +23,8 @@
 /**
  * Internal interface for in-flight timers.
  *
- * @param  type of the timer key.
- * @param  type of the timer namespace.
+ * @param  Type of the keys to which timers are scoped. + * @param  
type of the timer key.
--- End diff --

Does this comment  mean `@param  Type of the keys to which timers are 
scoped.`?


---


[GitHub] flink pull request #6062: [FLINK-9423][state] Implement efficient deletes fo...

2018-05-24 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r190517094
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
@@ -458,24 +458,33 @@ private int globalKeyGroupToLocalIndex(int keyGroup) {
return keyGroup - keyGroupRange.getStartKeyGroup();
}
 
-   private void checkCapacity(int requested) {
+   private void growIfRequired(int requiredSize) {
int oldArraySize = queue.length;
 
-   if (requested >= oldArraySize) {
+   if (requiredSize >= oldArraySize) {
final int grow = (oldArraySize < 64) ? oldArraySize + 2 
: oldArraySize >> 1;
-   int newArraySize = oldArraySize + grow;
-   if (newArraySize - MAX_ARRAY_SIZE > 0) {
-   if (newArraySize < 0 || requested > 
MAX_ARRAY_SIZE) {
-   throw new OutOfMemoryError("Required 
timer heap exceeds maximum size!");
-   } else {
-   newArraySize = MAX_ARRAY_SIZE;
-   }
-   }
-   queue = Arrays.copyOf(queue, newArraySize);
+   resizeQueueArray(oldArraySize + grow);
}
// TODO implement shrinking as well?
}
 
+   private void resizeForBulkLoad(int maxTotalSize) {
+   if (maxTotalSize > queue.length) {
+   resizeQueueArray(maxTotalSize + (maxTotalSize / 8));
--- End diff --

`maxTotalSize  / 8` -> `maxTotalSize >>> 3`


---


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488705#comment-16488705
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r190517094
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 ---
@@ -458,24 +458,33 @@ private int globalKeyGroupToLocalIndex(int keyGroup) {
return keyGroup - keyGroupRange.getStartKeyGroup();
}
 
-   private void checkCapacity(int requested) {
+   private void growIfRequired(int requiredSize) {
int oldArraySize = queue.length;
 
-   if (requested >= oldArraySize) {
+   if (requiredSize >= oldArraySize) {
final int grow = (oldArraySize < 64) ? oldArraySize + 2 
: oldArraySize >> 1;
-   int newArraySize = oldArraySize + grow;
-   if (newArraySize - MAX_ARRAY_SIZE > 0) {
-   if (newArraySize < 0 || requested > 
MAX_ARRAY_SIZE) {
-   throw new OutOfMemoryError("Required 
timer heap exceeds maximum size!");
-   } else {
-   newArraySize = MAX_ARRAY_SIZE;
-   }
-   }
-   queue = Arrays.copyOf(queue, newArraySize);
+   resizeQueueArray(oldArraySize + grow);
}
// TODO implement shrinking as well?
}
 
+   private void resizeForBulkLoad(int maxTotalSize) {
+   if (maxTotalSize > queue.length) {
+   resizeQueueArray(maxTotalSize + (maxTotalSize / 8));
--- End diff --

`maxTotalSize  / 8` -> `maxTotalSize >>> 3`


> Implement efficient deletes for heap based timer service
> 
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-24 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190518142
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -80,6 +83,9 @@
/** The queue of unassigned partitions that we need to assign to the 
Kafka consumer. */
private final 
ClosableBlockingQueue> 
unassignedPartitionsQueue;
 
+   /** The list of partitions to be removed from kafka consumer. */
+   private final Set partitionsToBeRemoved;
--- End diff --

From my understanding, for unassigned partitions we can use a Queue because 
it does not matter which consumer will take the new partitions.
But we can not use a Queue for partitions to be removed because we only can 
remove the partitions from the consumer that is actually subscribed to that 
partition.
Does that make sense ?


---


[jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488706#comment-16488706
 ] 

ASF GitHub Bot commented on FLINK-9423:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6062#discussion_r190517336
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
 ---
@@ -23,8 +23,8 @@
 /**
  * Internal interface for in-flight timers.
  *
- * @param  type of the timer key.
- * @param  type of the timer namespace.
+ * @param  Type of the keys to which timers are scoped. + * @param  
type of the timer key.
--- End diff --

Does this comment  mean `@param  Type of the keys to which timers are 
scoped.`?


> Implement efficient deletes for heap based timer service
> 
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> The current data structures in the `HeapInternalTimerService` are not able to 
> support efficient timer deletes, the complexity is currently O\(n\), where n 
> is the number of registered timers.
>  
> We can keep track of timer's positions in the priority queue and (in 
> combination with the already existing set/map) have a more efficient 
> algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9303) Unassign partitions from Kafka client if partitions become unavailable

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488707#comment-16488707
 ] 

ASF GitHub Bot commented on FLINK-9303:
---

Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190518142
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -80,6 +83,9 @@
/** The queue of unassigned partitions that we need to assign to the 
Kafka consumer. */
private final 
ClosableBlockingQueue> 
unassignedPartitionsQueue;
 
+   /** The list of partitions to be removed from kafka consumer. */
+   private final Set partitionsToBeRemoved;
--- End diff --

From my understanding, for unassigned partitions we can use a Queue because 
it does not matter which consumer will take the new partitions.
But we can not use a Queue for partitions to be removed because we only can 
remove the partitions from the consumer that is actually subscribed to that 
partition.
Does that make sense ?


> Unassign partitions from Kafka client if partitions become unavailable
> --
>
> Key: FLINK-9303
> URL: https://issues.apache.org/jira/browse/FLINK-9303
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.6.0
>
>
> Originally reported in ML:
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html]
> The problem is that the Kafka consumer has no notion of "closed" partitions 
> at the moment, so statically assigned partitions to the Kafka client is never 
> removed and is always continuously requested for records.
> This causes LOG noises as reported in the reported mail thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-24 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190519624
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -235,7 +243,8 @@ public FlinkKafkaConsumerBase(
Pattern topicPattern,
KeyedDeserializationSchema deserializer,
long discoveryIntervalMillis,
-   boolean useMetrics) {
+   boolean useMetrics,
+   boolean checkUnavailablePartitions) {
--- End diff --

I did it in that way only because this is something new, so I though that 
maybe you may want it to be configurable. But you are right I cannot think of a 
case we would prefer to keep the unavailable partitions.
I'll update the PR to make it the default behaviour if it's ok for you.


---


[jira] [Commented] (FLINK-9303) Unassign partitions from Kafka client if partitions become unavailable

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488710#comment-16488710
 ] 

ASF GitHub Bot commented on FLINK-9303:
---

Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190519624
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -235,7 +243,8 @@ public FlinkKafkaConsumerBase(
Pattern topicPattern,
KeyedDeserializationSchema deserializer,
long discoveryIntervalMillis,
-   boolean useMetrics) {
+   boolean useMetrics,
+   boolean checkUnavailablePartitions) {
--- End diff --

I did it in that way only because this is something new, so I though that 
maybe you may want it to be configurable. But you are right I cannot think of a 
case we would prefer to keep the unavailable partitions.
I'll update the PR to make it the default behaviour if it's ok for you.


> Unassign partitions from Kafka client if partitions become unavailable
> --
>
> Key: FLINK-9303
> URL: https://issues.apache.org/jira/browse/FLINK-9303
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.6.0
>
>
> Originally reported in ML:
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html]
> The problem is that the Kafka consumer has no notion of "closed" partitions 
> at the moment, so statically assigned partitions to the Kafka client is never 
> removed and is always continuously requested for records.
> This causes LOG noises as reported in the reported mail thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6007: [FLINK-8518][Table API & SQL] Support DOW for EXTRACT

2018-05-24 Thread snuyanzin
Github user snuyanzin commented on the issue:

https://github.com/apache/flink/pull/6007
  
@twalthr thank you for your comment but could you please clarify this

> Can you add some documentation to sqlApi.md such that users know about 
the semantics?

1. Am I right that you mean `docs/dev/table/sql.md`?
2. Currently I see the general explanation of extract and reserved words 
where DOW already specified.
From this point of view I do not see what could be updated right now. At 
the same time I have a proposal to go to the similar way as Calcite does. 
[Here](https://calcite.apache.org/docs/reference.html) there is a link to their 
functions including date/time. Among extract they also have synonyms e.g. 
>MONTH(date) | Equivalent to EXTRACT(MONTH FROM date). Returns an integer 
between 1 and 12.
>WEEK(date) | Equivalent to EXTRACT(WEEK FROM date). Returns an integer 
between 1 and 53.
>DAYOFYEAR(date) | Equivalent to EXTRACT(DOY FROM date). Returns an integer 
between 1 and 366.
and etc.
So I suggest to introduce the same synonyms in flink (just via usage of 
existing in Calcite) and organize documentation for them in a similar way








---


[jira] [Commented] (FLINK-8518) Support DOW, EPOCH, DECADE for EXTRACT

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488712#comment-16488712
 ] 

ASF GitHub Bot commented on FLINK-8518:
---

Github user snuyanzin commented on the issue:

https://github.com/apache/flink/pull/6007
  
@twalthr thank you for your comment but could you please clarify this

> Can you add some documentation to sqlApi.md such that users know about 
the semantics?

1. Am I right that you mean `docs/dev/table/sql.md`?
2. Currently I see the general explanation of extract and reserved words 
where DOW already specified.
From this point of view I do not see what could be updated right now. At 
the same time I have a proposal to go to the similar way as Calcite does. 
[Here](https://calcite.apache.org/docs/reference.html) there is a link to their 
functions including date/time. Among extract they also have synonyms e.g. 
>MONTH(date) | Equivalent to EXTRACT(MONTH FROM date). Returns an integer 
between 1 and 12.
>WEEK(date) | Equivalent to EXTRACT(WEEK FROM date). Returns an integer 
between 1 and 53.
>DAYOFYEAR(date) | Equivalent to EXTRACT(DOY FROM date). Returns an integer 
between 1 and 366.
and etc.
So I suggest to introduce the same synonyms in flink (just via usage of 
existing in Calcite) and organize documentation for them in a similar way








> Support DOW, EPOCH, DECADE for EXTRACT
> --
>
> Key: FLINK-8518
> URL: https://issues.apache.org/jira/browse/FLINK-8518
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> We upgraded Calcite to version 1.15 in FLINK-7934. The EXTRACT method 
> supports more conversion targets. The targets DOW, EPOCH, DECADE should be 
> implemented and tested for different datatypes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9384) KafkaAvroTableSource failed to work due to type mismatch

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488711#comment-16488711
 ] 

ASF GitHub Bot commented on FLINK-9384:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6026
  
@tragicjun this PR is hard to merge with 6 commits and a merge branch in 
the middle. Can you squash your commits into one commit that is rebased on the 
current master and do a force push?


> KafkaAvroTableSource failed to work due to type mismatch
> 
>
> Key: FLINK-9384
> URL: https://issues.apache.org/jira/browse/FLINK-9384
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Priority: Blocker
>  Labels: easyfix, patch
> Fix For: 1.6.0
>
> Attachments: flink-9384.patch
>
>
> An exception was thrown when using KafkaAvroTableSource as follows:
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> TableSource of type 
> org.apache.flink.streaming.connectors.kafka.Kafka011AvroTableSource returned 
> a DataStream of type GenericType that does not 
> match with the type Row(id: Integer, name: String, age: Integer, event: 
> GenericType) declared by the TableSource.getReturnType() 
> method. Please validate the implementation of the TableSource.
>  at 
> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:100)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:279)
>  at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
>  at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
>  at 
> org.apache.flink.quickstart.StreamingJobAvro.main(StreamingJobAvro.java:85)
>  
> It is caused by a discrepancy between the type returned by the TableSource 
> and the type returned by the DataStream. I've already fixed it, would someone 
> please review the patch and see if it could be merged.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6026: [FLINK-9384]KafkaAvroTableSource failed to work due to ty...

2018-05-24 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6026
  
@tragicjun this PR is hard to merge with 6 commits and a merge branch in 
the middle. Can you squash your commits into one commit that is rebased on the 
current master and do a force push?


---


[GitHub] flink issue #6031: [FLINK-9386] Embed netty router

2018-05-24 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/6031
  
Thanks :) I have squashed commits and rebased the PR on latest master.


---


[jira] [Commented] (FLINK-9386) Remove netty-router dependency

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488726#comment-16488726
 ] 

ASF GitHub Bot commented on FLINK-9386:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/6031
  
Thanks :) I have squashed commits and rebased the PR on latest master.


> Remove netty-router dependency
> --
>
> Key: FLINK-9386
> URL: https://issues.apache.org/jira/browse/FLINK-9386
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.6.0
>
>
> netty-router 1.10 blocks upgrade to 4.1, while netty-router 2.2.0 has broken 
> compatibility in a way that it's unusable by us (it doesn't allow to sort 
> router paths as in https://issues.apache.org/jira/browse/FLINK-8000 ). I 
> propose to copy & simplify & modify netty-router code to suite our needs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9431) Introduce TimeEnd State to flink cep

2018-05-24 Thread aitozi (JIRA)
aitozi created FLINK-9431:
-

 Summary: Introduce TimeEnd State to flink cep
 Key: FLINK-9431
 URL: https://issues.apache.org/jira/browse/FLINK-9431
 Project: Flink
  Issue Type: Improvement
  Components: CEP
Affects Versions: 1.4.2
Reporter: aitozi
Assignee: aitozi


Now flink cep have no support to reach a Final State upon past some time. if i 
use a pattern like 
{code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element be 
emitted after 5minutes, i have no way.

I want to introduce a timeEnd State to work with notFollowedBy to figure out 
with this scenior.
It can be used like this 
{code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code},
[~dawidwys] [~kkl0u] Is this meaningful?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9431) Introduce TimeEnd State to flink cep

2018-05-24 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488729#comment-16488729
 ] 

Dawid Wysakowicz commented on FLINK-9431:
-

Have you considered using timed out patterns? You could express your case with:
{code:java}
Pattern.begin('A').followedBy("B").withing(Time.minutes(5)){code}
And look for the timed out patterns, rather than the matched ones. Will it work 
for you this way?

> Introduce TimeEnd State to flink cep
> 
>
> Key: FLINK-9431
> URL: https://issues.apache.org/jira/browse/FLINK-9431
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>
> Now flink cep have no support to reach a Final State upon past some time. if 
> i use a pattern like 
> {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element 
> be emitted after 5minutes, i have no way.
> I want to introduce a timeEnd State to work with notFollowedBy to figure out 
> with this scenior.
> It can be used like this 
> {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code},
> [~dawidwys] [~kkl0u] Is this meaningful?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3952) Bump Netty to 4.1

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488731#comment-16488731
 ] 

ASF GitHub Bot commented on FLINK-3952:
---

pnowojski opened a new pull request #41: [FLINK-3952] Bump Netty to 4.1.24 and 
drop netty-router
URL: https://github.com/apache/flink-shaded/pull/41
 
 
   This is a part of 
[FLINK-3952](https://issues.apache.org/jira/browse/FLINK-3952)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bump Netty to 4.1
> -
>
> Key: FLINK-3952
> URL: https://issues.apache.org/jira/browse/FLINK-3952
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, Network
>Reporter: rektide de la fey
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: netty
>
> Netty 4.1 is about to release final. This release has [a number of 
> significant 
> enhancements|http://netty.io/wiki/new-and-noteworthy-in-4.1.html], and in 
> particular I find HTTP/2 codecs to be incredibly desirable to have. 
> Additionally, hopefully, the [Hadoop patches for Netty 
> 4.1|https://issues.apache.org/jira/browse/HADOOP-11716] get some tests and 
> get merged, & I believe if/when that happens it'll be important for Flink to 
> also be using the new Netty minor version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9431) Introduce TimeEnd State to flink cep

2018-05-24 Thread aitozi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488734#comment-16488734
 ] 

aitozi commented on FLINK-9431:
---

[~dawidwys]two thing:

# 1. if i use 
{code:java}Pattern.begin('A').notfollowedBy("B").within(Time.minutes(5)){code}, 
Pattern cant be end with notFollowedBy
# 2. within just compare the time with the start state, not with the any middle 
pattern you refer to. May be you want express the {code:java}A followedBy B 
notFollowedBy C and emit (A, B) when 5 minutes past{code}
# 3. it is not directly, i have to get matched results from timeout results
I think timeEnd state only need to work with notFollowedBy pattern because it 
has a explict path to Final state, and  timeEnd state can reach Final when time 
past.


> Introduce TimeEnd State to flink cep
> 
>
> Key: FLINK-9431
> URL: https://issues.apache.org/jira/browse/FLINK-9431
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>
> Now flink cep have no support to reach a Final State upon past some time. if 
> i use a pattern like 
> {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element 
> be emitted after 5minutes, i have no way.
> I want to introduce a timeEnd State to work with notFollowedBy to figure out 
> with this scenior.
> It can be used like this 
> {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code},
> [~dawidwys] [~kkl0u] Is this meaningful?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9431) Introduce TimeEnd State to flink cep

2018-05-24 Thread aitozi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488734#comment-16488734
 ] 

aitozi edited comment on FLINK-9431 at 5/24/18 9:56 AM:


[~dawidwys] Three thing:

# if i use 
{code:java}Pattern.begin('A').notfollowedBy("B").within(Time.minutes(5)){code}, 
Pattern cant be end with notFollowedBy
# within just compare the time with the start state, not with the any middle 
pattern you refer to. May be you want express the {code:java}A followedBy B 
notFollowedBy C and emit (A, B) when 5 minutes past{code}
# it is not directly, i have to get matched results from timeout results
I think timeEnd state only need to work with notFollowedBy pattern because it 
has a explict path to Final state, and  timeEnd state can reach Final when time 
past.



was (Author: aitozi):
[~dawidwys] Three thing:

# 1. if i use 
{code:java}Pattern.begin('A').notfollowedBy("B").within(Time.minutes(5)){code}, 
Pattern cant be end with notFollowedBy
# 2. within just compare the time with the start state, not with the any middle 
pattern you refer to. May be you want express the {code:java}A followedBy B 
notFollowedBy C and emit (A, B) when 5 minutes past{code}
# 3. it is not directly, i have to get matched results from timeout results
I think timeEnd state only need to work with notFollowedBy pattern because it 
has a explict path to Final state, and  timeEnd state can reach Final when time 
past.


> Introduce TimeEnd State to flink cep
> 
>
> Key: FLINK-9431
> URL: https://issues.apache.org/jira/browse/FLINK-9431
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>
> Now flink cep have no support to reach a Final State upon past some time. if 
> i use a pattern like 
> {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element 
> be emitted after 5minutes, i have no way.
> I want to introduce a timeEnd State to work with notFollowedBy to figure out 
> with this scenior.
> It can be used like this 
> {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code},
> [~dawidwys] [~kkl0u] Is this meaningful?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9431) Introduce TimeEnd State to flink cep

2018-05-24 Thread aitozi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488734#comment-16488734
 ] 

aitozi edited comment on FLINK-9431 at 5/24/18 9:56 AM:


[~dawidwys] Three thing:

# 1. if i use 
{code:java}Pattern.begin('A').notfollowedBy("B").within(Time.minutes(5)){code}, 
Pattern cant be end with notFollowedBy
# 2. within just compare the time with the start state, not with the any middle 
pattern you refer to. May be you want express the {code:java}A followedBy B 
notFollowedBy C and emit (A, B) when 5 minutes past{code}
# 3. it is not directly, i have to get matched results from timeout results
I think timeEnd state only need to work with notFollowedBy pattern because it 
has a explict path to Final state, and  timeEnd state can reach Final when time 
past.



was (Author: aitozi):
[~dawidwys]two thing:

# 1. if i use 
{code:java}Pattern.begin('A').notfollowedBy("B").within(Time.minutes(5)){code}, 
Pattern cant be end with notFollowedBy
# 2. within just compare the time with the start state, not with the any middle 
pattern you refer to. May be you want express the {code:java}A followedBy B 
notFollowedBy C and emit (A, B) when 5 minutes past{code}
# 3. it is not directly, i have to get matched results from timeout results
I think timeEnd state only need to work with notFollowedBy pattern because it 
has a explict path to Final state, and  timeEnd state can reach Final when time 
past.


> Introduce TimeEnd State to flink cep
> 
>
> Key: FLINK-9431
> URL: https://issues.apache.org/jira/browse/FLINK-9431
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>
> Now flink cep have no support to reach a Final State upon past some time. if 
> i use a pattern like 
> {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element 
> be emitted after 5minutes, i have no way.
> I want to introduce a timeEnd State to work with notFollowedBy to figure out 
> with this scenior.
> It can be used like this 
> {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code},
> [~dawidwys] [~kkl0u] Is this meaningful?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-24 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190529981
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -374,8 +385,8 @@ void setOffsetsToCommit(
 * This method is exposed for testing purposes.
 */
@VisibleForTesting
-   void reassignPartitions(List> 
newPartitions) throws Exception {
-   if (newPartitions.size() == 0) {
+   void reassignPartitions(List> 
newPartitions, Set partitionsToBeRemoved) throws Exception {
--- End diff --

I though about it, but my only concern is the case where we'd have both, 
partitions to add and partitions to remove...  
the `consumerCallBridge.assignPartitions()` takes the whole new list of 
partitions, so in that case, we would need to wait for the first assignment 
(e.g. add new partitions) before doing the second assignment (e.g. remove 
partitions) in order to have a consistent list of partitions. 
I think we would try to have only one call to 
`consumerCallBridge.assignPartitions()`.

Maybe I could refactor the part where partitions are removed from old 
partitions to a separate private method like `removeFromOldPartitions()` ?

What do you think ?


---


[jira] [Comment Edited] (FLINK-9431) Introduce TimeEnd State to flink cep

2018-05-24 Thread aitozi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488734#comment-16488734
 ] 

aitozi edited comment on FLINK-9431 at 5/24/18 9:57 AM:


[~dawidwys] Three thing:

# if i use 
{code:java}Pattern.begin('A').notfollowedBy("B").within(Time.minutes(5)){code}, 
Pattern cant be end with notFollowedBy
# within just compare the time with the start state, not with the any middle 
pattern you refer to. May be you want express the {code:java}A followedBy B 
notFollowedBy C and emit (A, B) when 5 minutes past the B elements you 
meet{code}
# it is not directly, i have to get matched results from timeout results
I think timeEnd state only need to work with notFollowedBy pattern because it 
has a explict path to Final state, and  timeEnd state can reach Final when time 
past.



was (Author: aitozi):
[~dawidwys] Three thing:

# if i use 
{code:java}Pattern.begin('A').notfollowedBy("B").within(Time.minutes(5)){code}, 
Pattern cant be end with notFollowedBy
# within just compare the time with the start state, not with the any middle 
pattern you refer to. May be you want express the {code:java}A followedBy B 
notFollowedBy C and emit (A, B) when 5 minutes past{code}
# it is not directly, i have to get matched results from timeout results
I think timeEnd state only need to work with notFollowedBy pattern because it 
has a explict path to Final state, and  timeEnd state can reach Final when time 
past.


> Introduce TimeEnd State to flink cep
> 
>
> Key: FLINK-9431
> URL: https://issues.apache.org/jira/browse/FLINK-9431
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>
> Now flink cep have no support to reach a Final State upon past some time. if 
> i use a pattern like 
> {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element 
> be emitted after 5minutes, i have no way.
> I want to introduce a timeEnd State to work with notFollowedBy to figure out 
> with this scenior.
> It can be used like this 
> {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code},
> [~dawidwys] [~kkl0u] Is this meaningful?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9303) Unassign partitions from Kafka client if partitions become unavailable

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488735#comment-16488735
 ] 

ASF GitHub Bot commented on FLINK-9303:
---

Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190529981
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -374,8 +385,8 @@ void setOffsetsToCommit(
 * This method is exposed for testing purposes.
 */
@VisibleForTesting
-   void reassignPartitions(List> 
newPartitions) throws Exception {
-   if (newPartitions.size() == 0) {
+   void reassignPartitions(List> 
newPartitions, Set partitionsToBeRemoved) throws Exception {
--- End diff --

I though about it, but my only concern is the case where we'd have both, 
partitions to add and partitions to remove...  
the `consumerCallBridge.assignPartitions()` takes the whole new list of 
partitions, so in that case, we would need to wait for the first assignment 
(e.g. add new partitions) before doing the second assignment (e.g. remove 
partitions) in order to have a consistent list of partitions. 
I think we would try to have only one call to 
`consumerCallBridge.assignPartitions()`.

Maybe I could refactor the part where partitions are removed from old 
partitions to a separate private method like `removeFromOldPartitions()` ?

What do you think ?


> Unassign partitions from Kafka client if partitions become unavailable
> --
>
> Key: FLINK-9303
> URL: https://issues.apache.org/jira/browse/FLINK-9303
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.6.0
>
>
> Originally reported in ML:
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html]
> The problem is that the Kafka consumer has no notion of "closed" partitions 
> at the moment, so statically assigned partitions to the Kafka client is never 
> removed and is always continuously requested for records.
> This causes LOG noises as reported in the reported mail thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-24 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190530647
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -240,7 +249,9 @@ public void run() {
newPartitions = 
unassignedPartitionsQueue.getBatchBlocking();
}
if (newPartitions != null) {
-   
reassignPartitions(newPartitions);
+   
reassignPartitions(newPartitions, new HashSet<>());
--- End diff --

I just realized this should be actually
`reassignPartitions(newPartitions, partitionsToBeRemoved);`


---


[jira] [Commented] (FLINK-9303) Unassign partitions from Kafka client if partitions become unavailable

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488737#comment-16488737
 ] 

ASF GitHub Bot commented on FLINK-9303:
---

Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190530647
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -240,7 +249,9 @@ public void run() {
newPartitions = 
unassignedPartitionsQueue.getBatchBlocking();
}
if (newPartitions != null) {
-   
reassignPartitions(newPartitions);
+   
reassignPartitions(newPartitions, new HashSet<>());
--- End diff --

I just realized this should be actually
`reassignPartitions(newPartitions, partitionsToBeRemoved);`


> Unassign partitions from Kafka client if partitions become unavailable
> --
>
> Key: FLINK-9303
> URL: https://issues.apache.org/jira/browse/FLINK-9303
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.6.0
>
>
> Originally reported in ML:
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html]
> The problem is that the Kafka consumer has no notion of "closed" partitions 
> at the moment, so statically assigned partitions to the Kafka client is never 
> removed and is always continuously requested for records.
> This causes LOG noises as reported in the reported mail thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9431) Introduce TimeEnd State to flink cep

2018-05-24 Thread aitozi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488734#comment-16488734
 ] 

aitozi edited comment on FLINK-9431 at 5/24/18 10:00 AM:
-

[~dawidwys] Three thing:

# if i use 
{code:java}Pattern.begin('A').notfollowedBy("B").within(Time.minutes(5)){code}, 
Pattern cant be end with notFollowedBy
# within just compare the time with the start state, not with the any middle 
pattern you refer to. May be you want express the {code:java}A followedBy B 
notFollowedBy C and emit (A, B) when 5 minutes past the B elements you 
meet{code}
# it is not directly, i have to get matched results from timeout results
I think timeEnd state only need to work with notFollowedBy pattern because it 
dont have a explict path to Final state, and  timeEnd state can reach Final 
when time past.



was (Author: aitozi):
[~dawidwys] Three thing:

# if i use 
{code:java}Pattern.begin('A').notfollowedBy("B").within(Time.minutes(5)){code}, 
Pattern cant be end with notFollowedBy
# within just compare the time with the start state, not with the any middle 
pattern you refer to. May be you want express the {code:java}A followedBy B 
notFollowedBy C and emit (A, B) when 5 minutes past the B elements you 
meet{code}
# it is not directly, i have to get matched results from timeout results
I think timeEnd state only need to work with notFollowedBy pattern because it 
has a explict path to Final state, and  timeEnd state can reach Final when time 
past.


> Introduce TimeEnd State to flink cep
> 
>
> Key: FLINK-9431
> URL: https://issues.apache.org/jira/browse/FLINK-9431
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.4.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>
> Now flink cep have no support to reach a Final State upon past some time. if 
> i use a pattern like 
> {code:java}Pattern.begin('A').notFollowedBy("B"){code}, if i want A element 
> be emitted after 5minutes, i have no way.
> I want to introduce a timeEnd State to work with notFollowedBy to figure out 
> with this scenior.
> It can be used like this 
> {code:java}Pattern.begin('A').notFollowedBy("B").timeEnd("end").{code},
> [~dawidwys] [~kkl0u] Is this meaningful?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6070: [FLINK-9409] [formats] Remove flink-avro and flink...

2018-05-24 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/6070

[FLINK-9409] [formats] Remove flink-avro and flink-json from /opt

## What is the purpose of the change

Both flink-json and flink-avro were added to the /opt directory in order to 
use them as jars for the SQL Client. However, they are not required anymore 
because we added dedicated SQL jars later that are distributed through Maven 
central (see FLINK-8831).

`mvn clean install -pl flink-dist -am` is working again

## Brief change log

Remove all formats from /opt

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-9409

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6070.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6070


commit c567b48e67a7c1cde74e9cea74b36ebdd52751da
Author: Timo Walther 
Date:   2018-05-24T10:09:54Z

[FLINK-9409] [formats] Remove flink-avro and flink-json from /opt




---


[jira] [Commented] (FLINK-9409) Remove flink-avro and flink-json from /opt

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488751#comment-16488751
 ] 

ASF GitHub Bot commented on FLINK-9409:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/6070

[FLINK-9409] [formats] Remove flink-avro and flink-json from /opt

## What is the purpose of the change

Both flink-json and flink-avro were added to the /opt directory in order to 
use them as jars for the SQL Client. However, they are not required anymore 
because we added dedicated SQL jars later that are distributed through Maven 
central (see FLINK-8831).

`mvn clean install -pl flink-dist -am` is working again

## Brief change log

Remove all formats from /opt

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-9409

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6070.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6070


commit c567b48e67a7c1cde74e9cea74b36ebdd52751da
Author: Timo Walther 
Date:   2018-05-24T10:09:54Z

[FLINK-9409] [formats] Remove flink-avro and flink-json from /opt




> Remove flink-avro and flink-json from /opt
> --
>
> Key: FLINK-9409
> URL: https://issues.apache.org/jira/browse/FLINK-9409
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Timo Walther
>Priority: Major
>
> Building Flink via {{mvn clean install -pl flink-dist -am}} currently fails 
> because {{flink-json}} is not defined as a dependency in 
> {{flink-dist/pom.xml}} although being used in the assembly.
> The same is true for {{flink-avro}} but this seems to be built due to some 
> indirect dependency.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6070: [FLINK-9409] [formats] Remove flink-avro and flink-json f...

2018-05-24 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6070
  
@NicoK can you verify the change?


---


[jira] [Commented] (FLINK-9409) Remove flink-avro and flink-json from /opt

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488753#comment-16488753
 ] 

ASF GitHub Bot commented on FLINK-9409:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6070
  
@NicoK can you verify the change?


> Remove flink-avro and flink-json from /opt
> --
>
> Key: FLINK-9409
> URL: https://issues.apache.org/jira/browse/FLINK-9409
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Timo Walther
>Priority: Major
>
> Building Flink via {{mvn clean install -pl flink-dist -am}} currently fails 
> because {{flink-json}} is not defined as a dependency in 
> {{flink-dist/pom.xml}} although being used in the assembly.
> The same is true for {{flink-avro}} but this seems to be built due to some 
> indirect dependency.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8518) Support DOW, EPOCH, DECADE for EXTRACT

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488768#comment-16488768
 ] 

ASF GitHub Bot commented on FLINK-8518:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6007
  
Hi @snuyanzin,
yes, I meant `docs/dev/table/sql.md`. Right now it is not documented which 
units are supported by `EXTRACT`. Just because `DOW` is in the reserved 
keywords section doesn't mean that we support it. I'm fine with having the same 
schema than Calcite. We can create one or more issues for the docs and the 
additional synonyms. Issues that need a newer Calcite version can be converted 
to a subtask of FLINK-9134.


> Support DOW, EPOCH, DECADE for EXTRACT
> --
>
> Key: FLINK-8518
> URL: https://issues.apache.org/jira/browse/FLINK-8518
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> We upgraded Calcite to version 1.15 in FLINK-7934. The EXTRACT method 
> supports more conversion targets. The targets DOW, EPOCH, DECADE should be 
> implemented and tested for different datatypes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6007: [FLINK-8518][Table API & SQL] Support DOW for EXTRACT

2018-05-24 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6007
  
Hi @snuyanzin,
yes, I meant `docs/dev/table/sql.md`. Right now it is not documented which 
units are supported by `EXTRACT`. Just because `DOW` is in the reserved 
keywords section doesn't mean that we support it. I'm fine with having the same 
schema than Calcite. We can create one or more issues for the docs and the 
additional synonyms. Issues that need a newer Calcite version can be converted 
to a subtask of FLINK-9134.


---


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-24 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488779#comment-16488779
 ] 

yanxiaobin commented on FLINK-8500:
---

I prefer to  to expose Kafka's ConsumerRecord directly(DeserializationSchema), 
thus ensuring the flexibility of the interface, otherwise there will always be 
various limitations.

 

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6066: [FLINK-9428] [checkpointing] Allow operators to fl...

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6066#discussion_r190543815
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
+import 
org.apache.flink.streaming.runtime.operators.StreamOperatorChainingTest;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.BroadcastingOutputCollector;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingOutput;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.WatermarkGaugeExposingOutput;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This class test the {@link OperatorChain}.
+ *
+ * It takes a different (simpler) approach at testing the operator 
chain than
+ * {@link StreamOperatorChainingTest}.
+ */
+public class OperatorChainTest {
+
+   @Test
+   public void testPrepareCheckpointPreBarrier() throws Exception {
+   final AtomicInteger intRef = new AtomicInteger();
+
+   final OneInputStreamOperator one = new 
ValidatingOperator(intRef, 0);
+   final OneInputStreamOperator two = new 
ValidatingOperator(intRef, 1);
+   final OneInputStreamOperator three = new 
ValidatingOperator(intRef, 2);
+
+   final OperatorChain chain = setupOperatorChain(one, two, 
three);
+   
chain.prepareSnapshotPreBarrier(ValidatingOperator.CHECKPOINT_ID);
+
+   assertEquals(3, intRef.get());
+   }
+
+   // 

+   //  Operator Chain Setup Utils
+   // 

+
+   @SafeVarargs
+   private static > OperatorChain 
setupOperatorChain(
--- End diff --

This is a lot of mocking, but the alternative approach ties itself not only 
to the internals of the `OperatorChain`, but also to the stream config 
specifics. In that sense, I would like to keep this, because it at least ties 
itself to details one component, rather than two components.

This hints that OperatorChain could really use come refactoring.


---


[jira] [Commented] (FLINK-9428) Allow operators to flush data on checkpoint pre-barrier

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488794#comment-16488794
 ] 

ASF GitHub Bot commented on FLINK-9428:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6066#discussion_r190543815
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
+import 
org.apache.flink.streaming.runtime.operators.StreamOperatorChainingTest;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.BroadcastingOutputCollector;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingOutput;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.WatermarkGaugeExposingOutput;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This class test the {@link OperatorChain}.
+ *
+ * It takes a different (simpler) approach at testing the operator 
chain than
+ * {@link StreamOperatorChainingTest}.
+ */
+public class OperatorChainTest {
+
+   @Test
+   public void testPrepareCheckpointPreBarrier() throws Exception {
+   final AtomicInteger intRef = new AtomicInteger();
+
+   final OneInputStreamOperator one = new 
ValidatingOperator(intRef, 0);
+   final OneInputStreamOperator two = new 
ValidatingOperator(intRef, 1);
+   final OneInputStreamOperator three = new 
ValidatingOperator(intRef, 2);
+
+   final OperatorChain chain = setupOperatorChain(one, two, 
three);
+   
chain.prepareSnapshotPreBarrier(ValidatingOperator.CHECKPOINT_ID);
+
+   assertEquals(3, intRef.get());
+   }
+
+   // 

+   //  Operator Chain Setup Utils
+   // 

+
+   @SafeVarargs
+   private static > OperatorChain 
setupOperatorChain(
--- End diff --

This is a lot of mocking, but the alternative approach ties itself not only 
to the internals of the `OperatorChain`, but also to the stream config 
specifics. In that sense, I would like to keep this, because it at least ties 
itself to details one component, rather than two components.

This hints that OperatorChain could really use come refactoring.


> Allow operators to flush data on checkpoint pre-barrier
> ---
>
> Key: FLINK-9428
> URL: https://issues.apache.org/jira/browse/FLINK-9428
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Some operators m

[jira] [Commented] (FLINK-8511) Remove legacy code for the TableType annotation

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488801#comment-16488801
 ] 

ASF GitHub Bot commented on FLINK-8511:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6055
  
Thank you @kisimple. The PR looks good.

We can also remove the `jcl-over-slf4j` dependency and update the 
`dependencyManagement` section. But I will do this and merge this after testing.


> Remove legacy code for the TableType annotation
> ---
>
> Key: FLINK-8511
> URL: https://issues.apache.org/jira/browse/FLINK-8511
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Timo Walther
>Assignee: blues zheng
>Priority: Critical
>
> We introduced the very generic TableSource factories that unify the 
> definition of table sources and are specified using Java service loaders. For 
> backwards compatibility, the old code paths are still supported but should be 
> dropped in future Flink versions.
> This will touch:
> {code}
> org.apache.flink.table.annotation.TableType
> org.apache.flink.table.catalog.ExternalCatalogTable
> org.apache.flink.table.api.NoMatchedTableSourceConverterException
> org.apache.flink.table.api.AmbiguousTableSourceConverterException
> org.apache.flink.table.catalog.TableSourceConverter
> org.apache.flink.table.catalog.ExternalTableSourceUtil
> {code}
> We can also drop the {{org.reflections}} and {{commons-configuration}} (and 
> maybe more?) dependencies.
> See also FLINK-8240



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6055: [FLINK-8511][table] Remove legacy code for the TableType ...

2018-05-24 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6055
  
Thank you @kisimple. The PR looks good.

We can also remove the `jcl-over-slf4j` dependency and update the 
`dependencyManagement` section. But I will do this and merge this after testing.


---


[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...

2018-05-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6068#discussion_r190548607
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 ---
@@ -271,6 +279,80 @@ public void 
testBlobServerCleanupWhenClosingDispatcher() throws Exception {
assertThat(deleteAllFuture.isDone(), is(false));
}
 
+   /**
+* Tests that the {@link RunningJobsRegistry} entries are cleared after 
the
+* job reached a terminal state.
+*/
+   @Test
+   public void testRunningJobsRegistryCleanup() throws Exception {
+   submitJob();
+
+   runningJobsRegistry.setJobRunning(jobId);
+   assertThat(runningJobsRegistry.contains(jobId), is(true));
+
+   resultFuture.complete(new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
+
+   // wait for the clearing
+   runningJobsRegistry.getClearedFuture().get();
+
+   assertThat(runningJobsRegistry.contains(jobId), is(false));
+   }
+
+   private static final class TestingRunningJobsRegistry implements 
RunningJobsRegistry {
--- End diff --

Maybe `SingleRunningJobRegistry`.


---


[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...

2018-05-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6068#discussion_r190500268
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 ---
@@ -271,6 +279,80 @@ public void 
testBlobServerCleanupWhenClosingDispatcher() throws Exception {
assertThat(deleteAllFuture.isDone(), is(false));
}
 
+   /**
+* Tests that the {@link RunningJobsRegistry} entries are cleared after 
the
+* job reached a terminal state.
+*/
+   @Test
+   public void testRunningJobsRegistryCleanup() throws Exception {
+   submitJob();
+
+   runningJobsRegistry.setJobRunning(jobId);
+   assertThat(runningJobsRegistry.contains(jobId), is(true));
+
+   resultFuture.complete(new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
+
+   // wait for the clearing
+   runningJobsRegistry.getClearedFuture().get();
+
+   assertThat(runningJobsRegistry.contains(jobId), is(false));
+   }
+
+   private static final class TestingRunningJobsRegistry implements 
RunningJobsRegistry {
+
+   private final JobID jobId;
+
+   private final CompletableFuture clearedFuture = new 
CompletableFuture<>();
+
+   private JobSchedulingStatus jobSchedulingStatus = 
JobSchedulingStatus.PENDING;
+
+   private boolean containsJob = false;
+
+   private TestingRunningJobsRegistry(JobID jobId) {
+   this.jobId = jobId;
+   }
+
+   public CompletableFuture getClearedFuture() {
+   return clearedFuture;
+   }
+
+   @Override
+   public void setJobRunning(JobID jobID) throws IOException {
+   checkJobId(jobID);
+   containsJob = true;
+   jobSchedulingStatus = JobSchedulingStatus.RUNNING;
+   }
+
+   private void checkJobId(JobID jobID) {
+   Preconditions.checkArgument(jobId.equals(jobID));
--- End diff --

Not the best variable names here: `jobId` vs. `jobID`


---


[GitHub] flink pull request #6068: [FLINK-9421] Remove job from RunningJobsRegistry w...

2018-05-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6068#discussion_r190548385
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 ---
@@ -271,6 +279,80 @@ public void 
testBlobServerCleanupWhenClosingDispatcher() throws Exception {
assertThat(deleteAllFuture.isDone(), is(false));
}
 
+   /**
+* Tests that the {@link RunningJobsRegistry} entries are cleared after 
the
+* job reached a terminal state.
+*/
+   @Test
+   public void testRunningJobsRegistryCleanup() throws Exception {
+   submitJob();
+
+   runningJobsRegistry.setJobRunning(jobId);
+   assertThat(runningJobsRegistry.contains(jobId), is(true));
+
+   resultFuture.complete(new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
+
+   // wait for the clearing
+   runningJobsRegistry.getClearedFuture().get();
+
+   assertThat(runningJobsRegistry.contains(jobId), is(false));
+   }
+
+   private static final class TestingRunningJobsRegistry implements 
RunningJobsRegistry {
+
+   private final JobID jobId;
+
+   private final CompletableFuture clearedFuture = new 
CompletableFuture<>();
--- End diff --

For simplicity I would use a `CountDownLatch` here.


---


[jira] [Commented] (FLINK-9421) RunningJobsRegistry entries are not cleaned up after job termination

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488809#comment-16488809
 ] 

ASF GitHub Bot commented on FLINK-9421:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6068#discussion_r190548607
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 ---
@@ -271,6 +279,80 @@ public void 
testBlobServerCleanupWhenClosingDispatcher() throws Exception {
assertThat(deleteAllFuture.isDone(), is(false));
}
 
+   /**
+* Tests that the {@link RunningJobsRegistry} entries are cleared after 
the
+* job reached a terminal state.
+*/
+   @Test
+   public void testRunningJobsRegistryCleanup() throws Exception {
+   submitJob();
+
+   runningJobsRegistry.setJobRunning(jobId);
+   assertThat(runningJobsRegistry.contains(jobId), is(true));
+
+   resultFuture.complete(new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
+
+   // wait for the clearing
+   runningJobsRegistry.getClearedFuture().get();
+
+   assertThat(runningJobsRegistry.contains(jobId), is(false));
+   }
+
+   private static final class TestingRunningJobsRegistry implements 
RunningJobsRegistry {
--- End diff --

Maybe `SingleRunningJobRegistry`.


> RunningJobsRegistry entries are not cleaned up after job termination
> 
>
> Key: FLINK-9421
> URL: https://issues.apache.org/jira/browse/FLINK-9421
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>
> Currently, the {{Dispatcher}} does not clean up the {{RunningJobsRegistry}} 
> after the job has finished. The consequence is that a ZNode with the JobID 
> and a state num per job remains in ZooKeeper.
> We should clean up these ZNodes to avoid a resource leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9421) RunningJobsRegistry entries are not cleaned up after job termination

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488810#comment-16488810
 ] 

ASF GitHub Bot commented on FLINK-9421:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6068#discussion_r190500268
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 ---
@@ -271,6 +279,80 @@ public void 
testBlobServerCleanupWhenClosingDispatcher() throws Exception {
assertThat(deleteAllFuture.isDone(), is(false));
}
 
+   /**
+* Tests that the {@link RunningJobsRegistry} entries are cleared after 
the
+* job reached a terminal state.
+*/
+   @Test
+   public void testRunningJobsRegistryCleanup() throws Exception {
+   submitJob();
+
+   runningJobsRegistry.setJobRunning(jobId);
+   assertThat(runningJobsRegistry.contains(jobId), is(true));
+
+   resultFuture.complete(new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
+
+   // wait for the clearing
+   runningJobsRegistry.getClearedFuture().get();
+
+   assertThat(runningJobsRegistry.contains(jobId), is(false));
+   }
+
+   private static final class TestingRunningJobsRegistry implements 
RunningJobsRegistry {
+
+   private final JobID jobId;
+
+   private final CompletableFuture clearedFuture = new 
CompletableFuture<>();
+
+   private JobSchedulingStatus jobSchedulingStatus = 
JobSchedulingStatus.PENDING;
+
+   private boolean containsJob = false;
+
+   private TestingRunningJobsRegistry(JobID jobId) {
+   this.jobId = jobId;
+   }
+
+   public CompletableFuture getClearedFuture() {
+   return clearedFuture;
+   }
+
+   @Override
+   public void setJobRunning(JobID jobID) throws IOException {
+   checkJobId(jobID);
+   containsJob = true;
+   jobSchedulingStatus = JobSchedulingStatus.RUNNING;
+   }
+
+   private void checkJobId(JobID jobID) {
+   Preconditions.checkArgument(jobId.equals(jobID));
--- End diff --

Not the best variable names here: `jobId` vs. `jobID`


> RunningJobsRegistry entries are not cleaned up after job termination
> 
>
> Key: FLINK-9421
> URL: https://issues.apache.org/jira/browse/FLINK-9421
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>
> Currently, the {{Dispatcher}} does not clean up the {{RunningJobsRegistry}} 
> after the job has finished. The consequence is that a ZNode with the JobID 
> and a state num per job remains in ZooKeeper.
> We should clean up these ZNodes to avoid a resource leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9421) RunningJobsRegistry entries are not cleaned up after job termination

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488808#comment-16488808
 ] 

ASF GitHub Bot commented on FLINK-9421:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6068#discussion_r190548385
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 ---
@@ -271,6 +279,80 @@ public void 
testBlobServerCleanupWhenClosingDispatcher() throws Exception {
assertThat(deleteAllFuture.isDone(), is(false));
}
 
+   /**
+* Tests that the {@link RunningJobsRegistry} entries are cleared after 
the
+* job reached a terminal state.
+*/
+   @Test
+   public void testRunningJobsRegistryCleanup() throws Exception {
+   submitJob();
+
+   runningJobsRegistry.setJobRunning(jobId);
+   assertThat(runningJobsRegistry.contains(jobId), is(true));
+
+   resultFuture.complete(new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(jobId).build());
+
+   // wait for the clearing
+   runningJobsRegistry.getClearedFuture().get();
+
+   assertThat(runningJobsRegistry.contains(jobId), is(false));
+   }
+
+   private static final class TestingRunningJobsRegistry implements 
RunningJobsRegistry {
+
+   private final JobID jobId;
+
+   private final CompletableFuture clearedFuture = new 
CompletableFuture<>();
--- End diff --

For simplicity I would use a `CountDownLatch` here.


> RunningJobsRegistry entries are not cleaned up after job termination
> 
>
> Key: FLINK-9421
> URL: https://issues.apache.org/jira/browse/FLINK-9421
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>
> Currently, the {{Dispatcher}} does not clean up the {{RunningJobsRegistry}} 
> after the job has finished. The consequence is that a ZNode with the JobID 
> and a state num per job remains in ZooKeeper.
> We should clean up these ZNodes to avoid a resource leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9432) Support extract epoch, decade, millisecond, microsecond

2018-05-24 Thread Sergey Nuyanzin (JIRA)
Sergey Nuyanzin created FLINK-9432:
--

 Summary: Support extract epoch, decade, millisecond, microsecond
 Key: FLINK-9432
 URL: https://issues.apache.org/jira/browse/FLINK-9432
 Project: Flink
  Issue Type: Sub-task
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin


The task is to separate activity depending on 
https://issues.apache.org/jira/browse/CALCITE-2303 from all others that could 
be done without upgrade avatica/calcite

Now the implementations of next functions are blocked
{code:sql}
extract(decade from ...)
extract(epoch from ...)
extract(millisecond from ...)
extract(microsecond from ...)
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6026: [FLINK-9384]KafkaAvroTableSource failed to work due to ty...

2018-05-24 Thread tragicjun
Github user tragicjun commented on the issue:

https://github.com/apache/flink/pull/6026
  
@twalthr sure, just a second, I never did this before, let me try


---


[jira] [Commented] (FLINK-9384) KafkaAvroTableSource failed to work due to type mismatch

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488817#comment-16488817
 ] 

ASF GitHub Bot commented on FLINK-9384:
---

Github user tragicjun commented on the issue:

https://github.com/apache/flink/pull/6026
  
@twalthr sure, just a second, I never did this before, let me try


> KafkaAvroTableSource failed to work due to type mismatch
> 
>
> Key: FLINK-9384
> URL: https://issues.apache.org/jira/browse/FLINK-9384
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Priority: Blocker
>  Labels: easyfix, patch
> Fix For: 1.6.0
>
> Attachments: flink-9384.patch
>
>
> An exception was thrown when using KafkaAvroTableSource as follows:
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> TableSource of type 
> org.apache.flink.streaming.connectors.kafka.Kafka011AvroTableSource returned 
> a DataStream of type GenericType that does not 
> match with the type Row(id: Integer, name: String, age: Integer, event: 
> GenericType) declared by the TableSource.getReturnType() 
> method. Please validate the implementation of the TableSource.
>  at 
> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:100)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:279)
>  at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
>  at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
>  at 
> org.apache.flink.quickstart.StreamingJobAvro.main(StreamingJobAvro.java:85)
>  
> It is caused by a discrepancy between the type returned by the TableSource 
> and the type returned by the DataStream. I've already fixed it, would someone 
> please review the patch and see if it could be merged.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-24 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/6071

[FLINK-3952][runtine] Upgrade to Netty 4.1

This PR adjusts our code to work with Netty 4.1. It also includes possible 
bug fix to file uploading cleanup in FileUploadHandler and HttpRequestHandler. 
For mor information look here:

https://github.com/netty/netty/issues/7611

First commit is only for having green travis and will be dropped once new 
`flink-shadded-netty` will be released.

## Verifying this change

This change is covered by variety of pre existing tests. Furthermore I have 
manually verified that issue mentioned by @uce in the commit message here: 
https://github.com/apache/flink/commit/d92e422ec7089376583a8f57043274d236c340a4
doesn't happen: 
- I have reproduced this issue on a test cluster with Flink 1.0-XXX
- I have verified that the same job passes without any problems after 
upgrading to Netty 4.1

I have also run our network benchmark suite and verified that there are no 
performance changes after this change.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**yes** / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (**yes** / 
no / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f3952

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6071.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6071


commit 7b3be7c9ebac7392136ed85bca4664559710552e
Author: Piotr Nowojski 
Date:   2018-05-14T11:46:08Z

[hotfix][tests] Report failure with error level instead of debug

commit afaf1d5181c7133a040bf3881723e240145a4b0a
Author: Piotr Nowojski 
Date:   2018-05-16T19:26:36Z

[FLINK-9386] Embed netty router

This commit replaces netty-router dependency with our own version of it, 
which is
simplified and adds guarantees about order of matching router patterns.

This is a prerequisite for FLINK-3952. netty-router 1.10 is incompatible 
with
Netty 4.1, while netty-router 2.2.0 brakes a compatibility in a way that we
were unable to use it.

commit 26bc92db0863bf53e60164ab5f6b92ac3b424506
Author: Piotr Nowojski 
Date:   2018-05-14T10:30:31Z

Embed flink-shaded-netty-4

commit 94a4cc2237b5dac0c004ec192eb4d7f1b782e5f2
Author: Piotr Nowojski 
Date:   2018-05-16T19:27:22Z

[FLINK-3952][runtine] Upgrade to Netty 4.1

This commit includes possible bug fix to file uploading cleanup in 
FileUploadHandler and
HttpRequestHandler. For mor information look here:

https://github.com/netty/netty/issues/7611




---


[jira] [Commented] (FLINK-3952) Bump Netty to 4.1

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488819#comment-16488819
 ] 

ASF GitHub Bot commented on FLINK-3952:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/6071

[FLINK-3952][runtine] Upgrade to Netty 4.1

This PR adjusts our code to work with Netty 4.1. It also includes possible 
bug fix to file uploading cleanup in FileUploadHandler and HttpRequestHandler. 
For mor information look here:

https://github.com/netty/netty/issues/7611

First commit is only for having green travis and will be dropped once new 
`flink-shadded-netty` will be released.

## Verifying this change

This change is covered by variety of pre existing tests. Furthermore I have 
manually verified that issue mentioned by @uce in the commit message here: 
https://github.com/apache/flink/commit/d92e422ec7089376583a8f57043274d236c340a4
doesn't happen: 
- I have reproduced this issue on a test cluster with Flink 1.0-XXX
- I have verified that the same job passes without any problems after 
upgrading to Netty 4.1

I have also run our network benchmark suite and verified that there are no 
performance changes after this change.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**yes** / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (**yes** / 
no / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f3952

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6071.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6071


commit 7b3be7c9ebac7392136ed85bca4664559710552e
Author: Piotr Nowojski 
Date:   2018-05-14T11:46:08Z

[hotfix][tests] Report failure with error level instead of debug

commit afaf1d5181c7133a040bf3881723e240145a4b0a
Author: Piotr Nowojski 
Date:   2018-05-16T19:26:36Z

[FLINK-9386] Embed netty router

This commit replaces netty-router dependency with our own version of it, 
which is
simplified and adds guarantees about order of matching router patterns.

This is a prerequisite for FLINK-3952. netty-router 1.10 is incompatible 
with
Netty 4.1, while netty-router 2.2.0 brakes a compatibility in a way that we
were unable to use it.

commit 26bc92db0863bf53e60164ab5f6b92ac3b424506
Author: Piotr Nowojski 
Date:   2018-05-14T10:30:31Z

Embed flink-shaded-netty-4

commit 94a4cc2237b5dac0c004ec192eb4d7f1b782e5f2
Author: Piotr Nowojski 
Date:   2018-05-16T19:27:22Z

[FLINK-3952][runtine] Upgrade to Netty 4.1

This commit includes possible bug fix to file uploading cleanup in 
FileUploadHandler and
HttpRequestHandler. For mor information look here:

https://github.com/netty/netty/issues/7611




> Bump Netty to 4.1
> -
>
> Key: FLINK-3952
> URL: https://issues.apache.org/jira/browse/FLINK-3952
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, Network
>Reporter: rektide de la fey
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: netty
>
> Netty 4.1 is about to release final. This release has [a number of 
> significant 
> enhancements|http://netty.io/wiki/new-and-noteworthy-in-4.1.html], and in 
> particular I find HTTP/2 codecs to be incredibly desirable to have. 
> Additionally, hopefully, the [Hadoop patches for Netty 
> 4.1|https://issues.apache.org/jira/browse/HADOOP-11716] get some tests and 
> get merged, & I believe if/when that happens it'll be important for Flink to 
> also be using the new Netty minor version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r190554149
  
--- Diff: pom.xml ---
@@ -308,7 +308,7 @@ under the License.
errors.
 
[1] https://github.com/netty/netty/issues/3704 
-->
--- End diff --

Looks like you can remove this comment now.


---


[GitHub] flink pull request #5983: [FLINK-7789][DataStream API] Add handler for Async...

2018-05-24 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5983#discussion_r190553816
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/TimeoutAwareAsyncFunction.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enhanced {@link AsyncFunction} which can handle timeouts.
+ */
+@PublicEvolving
+public interface TimeoutAwareAsyncFunction extends 
AsyncFunction {
+
+   /**
+* asyncInvoke timeout occurred.
+* Here you can complete the result future exceptionally with timeout 
exception,
+* or complete with empty result. You can also retry to complete with 
the right results.
+*
+* @param input element coming from an upstream task
+* @param resultFuture to be completed with the result data
+* @exception Exception in case of a user code error. An exception will 
make the task fail and
+* trigger fail-over process.
+*/
+   void timeout(IN input, ResultFuture resultFuture) throws Exception;
--- End diff --

Can not we add default method here (Java 8 feature)?


---


[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488826#comment-16488826
 ] 

ASF GitHub Bot commented on FLINK-7789:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5983#discussion_r190553816
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/TimeoutAwareAsyncFunction.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enhanced {@link AsyncFunction} which can handle timeouts.
+ */
+@PublicEvolving
+public interface TimeoutAwareAsyncFunction extends 
AsyncFunction {
+
+   /**
+* asyncInvoke timeout occurred.
+* Here you can complete the result future exceptionally with timeout 
exception,
+* or complete with empty result. You can also retry to complete with 
the right results.
+*
+* @param input element coming from an upstream task
+* @param resultFuture to be completed with the result data
+* @exception Exception in case of a user code error. An exception will 
make the task fail and
+* trigger fail-over process.
+*/
+   void timeout(IN input, ResultFuture resultFuture) throws Exception;
--- End diff --

Can not we add default method here (Java 8 feature)?


> Add handler for Async IO operator timeouts 
> ---
>
> Key: FLINK-7789
> URL: https://issues.apache.org/jira/browse/FLINK-7789
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Karthik Deivasigamani
>Assignee: blues zheng
>Priority: Major
>
> Currently Async IO operator does not provide a mechanism to handle timeouts. 
> When a request times out it an exception is thrown and job is restarted. It 
> would be good to pass a AsyncIOTimeoutHandler which can be implemented by the 
> user and passed in the constructor.
> Here is the discussion from apache flink users mailing list 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9432) Support extract epoch, decade, millisecond, microsecond

2018-05-24 Thread Sergey Nuyanzin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-9432:
---
Description: 
The task is to separate activity from depending on 
https://issues.apache.org/jira/browse/CALCITE-2303 from all others that could 
be done without upgrade avatica/calcite in  
https://issues.apache.org/jira/browse/FLINK-8518

Now the implementations of next functions are blocked
{code:sql}
extract(decade from ...)
extract(epoch from ...)
extract(millisecond from ...)
extract(microsecond from ...)
{code}


  was:
The task is to separate activity depending on 
https://issues.apache.org/jira/browse/CALCITE-2303 from all others that could 
be done without upgrade avatica/calcite

Now the implementations of next functions are blocked
{code:sql}
extract(decade from ...)
extract(epoch from ...)
extract(millisecond from ...)
extract(microsecond from ...)
{code}



> Support extract epoch, decade, millisecond, microsecond
> ---
>
> Key: FLINK-9432
> URL: https://issues.apache.org/jira/browse/FLINK-9432
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> The task is to separate activity from depending on 
> https://issues.apache.org/jira/browse/CALCITE-2303 from all others that could 
> be done without upgrade avatica/calcite in  
> https://issues.apache.org/jira/browse/FLINK-8518
> Now the implementations of next functions are blocked
> {code:sql}
> extract(decade from ...)
> extract(epoch from ...)
> extract(millisecond from ...)
> extract(microsecond from ...)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-24 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r190554721
  
--- Diff: pom.xml ---
@@ -308,7 +308,7 @@ under the License.
errors.
 
[1] https://github.com/netty/netty/issues/3704 
-->
--- End diff --

Ops, dropped.


---


[jira] [Commented] (FLINK-3952) Bump Netty to 4.1

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488833#comment-16488833
 ] 

ASF GitHub Bot commented on FLINK-3952:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r190554721
  
--- Diff: pom.xml ---
@@ -308,7 +308,7 @@ under the License.
errors.
 
[1] https://github.com/netty/netty/issues/3704 
-->
--- End diff --

Ops, dropped.


> Bump Netty to 4.1
> -
>
> Key: FLINK-3952
> URL: https://issues.apache.org/jira/browse/FLINK-3952
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, Network
>Reporter: rektide de la fey
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: netty
>
> Netty 4.1 is about to release final. This release has [a number of 
> significant 
> enhancements|http://netty.io/wiki/new-and-noteworthy-in-4.1.html], and in 
> particular I find HTTP/2 codecs to be incredibly desirable to have. 
> Additionally, hopefully, the [Hadoop patches for Netty 
> 4.1|https://issues.apache.org/jira/browse/HADOOP-11716] get some tests and 
> get merged, & I believe if/when that happens it'll be important for Flink to 
> also be using the new Netty minor version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9234) Commons Logging is missing from shaded Flink Table library

2018-05-24 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-9234.
-
Resolution: Fixed

After a offline discussion with [~till.rohrmann] we stick to the current 
dependency configuration as the error only occurs when building with Maven 
3.3+. The related legacy code will be removed in 1.6 anyway.

> Commons Logging is missing from shaded Flink Table library
> --
>
> Key: FLINK-9234
> URL: https://issues.apache.org/jira/browse/FLINK-9234
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.2
> Environment: jdk1.8.0_172
> flink 1.4.2
> Mac High Sierra
>Reporter: Eron Wright 
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
> Attachments: repro.scala
>
>
> The flink-table shaded library seems to be missing some classes from 
> {{org.apache.commons.logging}} that are required by 
> {{org.apache.commons.configuration}}.  Ran into the problem while using the 
> external catalog support, on Flink 1.4.2.
> See attached a repro, which produces:
> {code}
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/flink/table/shaded/org/apache/commons/logging/Log
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.parseScanPackagesFromConfigFile(ExternalTableSourceUtil.scala:153)
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.(ExternalTableSourceUtil.scala:55)
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.(ExternalTableSourceUtil.scala)
>   at 
> org.apache.flink.table.catalog.ExternalCatalogSchema.getTable(ExternalCatalogSchema.scala:78)
>   at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:82)
>   at 
> org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:256)
>   at 
> org.apache.calcite.jdbc.CalciteSchema$SchemaPlusImpl.getTable(CalciteSchema.java:561)
>   at 
> org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:497)
>   at 
> org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:485)
>   at Repro$.main(repro.scala:17)
>   at Repro.main(repro.scala)
> {code}
> Dependencies:
> {code}
> compile 'org.slf4j:slf4j-api:1.7.25'
> compile 'org.slf4j:slf4j-log4j12:1.7.25'
> runtime 'log4j:log4j:1.2.17'
> compile 'org.apache.flink:flink-scala_2.11:1.4.2'
> compile 'org.apache.flink:flink-streaming-scala_2.11:1.4.2'
> compile 'org.apache.flink:flink-clients_2.11:1.4.2'
> compile 'org.apache.flink:flink-table_2.11:1.4.2'
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6066: [FLINK-9428] [checkpointing] Allow operators to flush dat...

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6066
  
Thanks for the review.

Addressing the comments and merging this...


---


[jira] [Commented] (FLINK-9428) Allow operators to flush data on checkpoint pre-barrier

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488847#comment-16488847
 ] 

ASF GitHub Bot commented on FLINK-9428:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6066
  
Thanks for the review.

Addressing the comments and merging this...


> Allow operators to flush data on checkpoint pre-barrier
> ---
>
> Key: FLINK-9428
> URL: https://issues.apache.org/jira/browse/FLINK-9428
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Some operators maintain some small transient state that may be inefficient to 
> checkpoint, especially when it would need to be checkpointed also in a 
> re-scalable way.
> An example are opportunistic pre-aggregation operators, which have small the 
> pre-aggregation state that is frequently flushed downstream.
> Rather that persisting that state in a checkpoint, it can make sense to flush 
> the data downstream upon a checkpoint, to let it be part of the downstream 
> operator's state.
> This feature is sensitive, because flushing state has a clean implication on 
> the downstream operator's checkpoint alignment. However, used with care, and 
> with the new back-pressure-based checkpoint alignment, this feature can be 
> very useful.
> Because it is sensitive, I suggest to make this only an internal feature 
> (accessible to operators) and NOT expose it in the public API at this point.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5983: [FLINK-7789][DataStream API] Add handler for Async...

2018-05-24 Thread kisimple
Github user kisimple commented on a diff in the pull request:

https://github.com/apache/flink/pull/5983#discussion_r190560531
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/TimeoutAwareAsyncFunction.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enhanced {@link AsyncFunction} which can handle timeouts.
+ */
+@PublicEvolving
+public interface TimeoutAwareAsyncFunction extends 
AsyncFunction {
+
+   /**
+* asyncInvoke timeout occurred.
+* Here you can complete the result future exceptionally with timeout 
exception,
+* or complete with empty result. You can also retry to complete with 
the right results.
+*
+* @param input element coming from an upstream task
+* @param resultFuture to be completed with the result data
+* @exception Exception in case of a user code error. An exception will 
make the task fail and
+* trigger fail-over process.
+*/
+   void timeout(IN input, ResultFuture resultFuture) throws Exception;
--- End diff --

Sorry for that I did not pay so much attention to new features. I will try 
and if it works I will close this PR and open a new one. Thanks:)


---


[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488865#comment-16488865
 ] 

ASF GitHub Bot commented on FLINK-7789:
---

Github user kisimple commented on a diff in the pull request:

https://github.com/apache/flink/pull/5983#discussion_r190560531
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/TimeoutAwareAsyncFunction.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enhanced {@link AsyncFunction} which can handle timeouts.
+ */
+@PublicEvolving
+public interface TimeoutAwareAsyncFunction extends 
AsyncFunction {
+
+   /**
+* asyncInvoke timeout occurred.
+* Here you can complete the result future exceptionally with timeout 
exception,
+* or complete with empty result. You can also retry to complete with 
the right results.
+*
+* @param input element coming from an upstream task
+* @param resultFuture to be completed with the result data
+* @exception Exception in case of a user code error. An exception will 
make the task fail and
+* trigger fail-over process.
+*/
+   void timeout(IN input, ResultFuture resultFuture) throws Exception;
--- End diff --

Sorry for that I did not pay so much attention to new features. I will try 
and if it works I will close this PR and open a new one. Thanks:)


> Add handler for Async IO operator timeouts 
> ---
>
> Key: FLINK-7789
> URL: https://issues.apache.org/jira/browse/FLINK-7789
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Karthik Deivasigamani
>Assignee: blues zheng
>Priority: Major
>
> Currently Async IO operator does not provide a mechanism to handle timeouts. 
> When a request times out it an exception is thrown and job is restarted. It 
> would be good to pass a AsyncIOTimeoutHandler which can be implemented by the 
> user and passed in the constructor.
> Here is the discussion from apache flink users mailing list 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6066: [FLINK-9428] [checkpointing] Allow operators to flush dat...

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6066
  
Closed in e1d1234477c731fe3f398c7f3f12123f73764242


---


[GitHub] flink pull request #6066: [FLINK-9428] [checkpointing] Allow operators to fl...

2018-05-24 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/6066


---


[jira] [Commented] (FLINK-9428) Allow operators to flush data on checkpoint pre-barrier

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488871#comment-16488871
 ] 

ASF GitHub Bot commented on FLINK-9428:
---

Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/6066


> Allow operators to flush data on checkpoint pre-barrier
> ---
>
> Key: FLINK-9428
> URL: https://issues.apache.org/jira/browse/FLINK-9428
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Some operators maintain some small transient state that may be inefficient to 
> checkpoint, especially when it would need to be checkpointed also in a 
> re-scalable way.
> An example are opportunistic pre-aggregation operators, which have small the 
> pre-aggregation state that is frequently flushed downstream.
> Rather that persisting that state in a checkpoint, it can make sense to flush 
> the data downstream upon a checkpoint, to let it be part of the downstream 
> operator's state.
> This feature is sensitive, because flushing state has a clean implication on 
> the downstream operator's checkpoint alignment. However, used with care, and 
> with the new back-pressure-based checkpoint alignment, this feature can be 
> very useful.
> Because it is sensitive, I suggest to make this only an internal feature 
> (accessible to operators) and NOT expose it in the public API at this point.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9428) Allow operators to flush data on checkpoint pre-barrier

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488870#comment-16488870
 ] 

ASF GitHub Bot commented on FLINK-9428:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6066
  
Closed in e1d1234477c731fe3f398c7f3f12123f73764242


> Allow operators to flush data on checkpoint pre-barrier
> ---
>
> Key: FLINK-9428
> URL: https://issues.apache.org/jira/browse/FLINK-9428
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Some operators maintain some small transient state that may be inefficient to 
> checkpoint, especially when it would need to be checkpointed also in a 
> re-scalable way.
> An example are opportunistic pre-aggregation operators, which have small the 
> pre-aggregation state that is frequently flushed downstream.
> Rather that persisting that state in a checkpoint, it can make sense to flush 
> the data downstream upon a checkpoint, to let it be part of the downstream 
> operator's state.
> This feature is sensitive, because flushing state has a clean implication on 
> the downstream operator's checkpoint alignment. However, used with care, and 
> with the new back-pressure-based checkpoint alignment, this feature can be 
> very useful.
> Because it is sensitive, I suggest to make this only an internal feature 
> (accessible to operators) and NOT expose it in the public API at this point.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9428) Allow operators to flush data on checkpoint pre-barrier

2018-05-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-9428.
---

> Allow operators to flush data on checkpoint pre-barrier
> ---
>
> Key: FLINK-9428
> URL: https://issues.apache.org/jira/browse/FLINK-9428
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Some operators maintain some small transient state that may be inefficient to 
> checkpoint, especially when it would need to be checkpointed also in a 
> re-scalable way.
> An example are opportunistic pre-aggregation operators, which have small the 
> pre-aggregation state that is frequently flushed downstream.
> Rather that persisting that state in a checkpoint, it can make sense to flush 
> the data downstream upon a checkpoint, to let it be part of the downstream 
> operator's state.
> This feature is sensitive, because flushing state has a clean implication on 
> the downstream operator's checkpoint alignment. However, used with care, and 
> with the new back-pressure-based checkpoint alignment, this feature can be 
> very useful.
> Because it is sensitive, I suggest to make this only an internal feature 
> (accessible to operators) and NOT expose it in the public API at this point.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9428) Allow operators to flush data on checkpoint pre-barrier

2018-05-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-9428.
-
Resolution: Implemented

Implemented in e1d1234477c731fe3f398c7f3f12123f73764242

> Allow operators to flush data on checkpoint pre-barrier
> ---
>
> Key: FLINK-9428
> URL: https://issues.apache.org/jira/browse/FLINK-9428
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Some operators maintain some small transient state that may be inefficient to 
> checkpoint, especially when it would need to be checkpointed also in a 
> re-scalable way.
> An example are opportunistic pre-aggregation operators, which have small the 
> pre-aggregation state that is frequently flushed downstream.
> Rather that persisting that state in a checkpoint, it can make sense to flush 
> the data downstream upon a checkpoint, to let it be part of the downstream 
> operator's state.
> This feature is sensitive, because flushing state has a clean implication on 
> the downstream operator's checkpoint alignment. However, used with care, and 
> with the new back-pressure-based checkpoint alignment, this feature can be 
> very useful.
> Because it is sensitive, I suggest to make this only an internal feature 
> (accessible to operators) and NOT expose it in the public API at this point.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9349) KafkaConnector Exception while fetching from multiple kafka topics

2018-05-24 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-9349:
--
Affects Version/s: (was: 1.5.0)
Fix Version/s: (was: 1.5.1)
   (was: 1.6.0)
   1.5.0

> KafkaConnector Exception  while fetching from multiple kafka topics
> ---
>
> Key: FLINK-9349
> URL: https://issues.apache.org/jira/browse/FLINK-9349
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Vishal Santoshi
>Assignee: Sergey Nuyanzin
>Priority: Critical
> Fix For: 1.5.0, 1.4.3
>
> Attachments: Flink9349Test.java
>
>
> ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
>  
> It seems the List subscribedPartitionStates was being modified when 
> runFetchLoop iterated the List.
> This can happen if, e.g., FlinkKafkaConsumer runs the following code 
> concurrently:
>                 kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
>  
> {code:java}
>  java.util.ConcurrentModificationException
>   at 
> java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966)
>   at java.util.LinkedList$ListItr.next(LinkedList.java:888)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9349) KafkaConnector Exception while fetching from multiple kafka topics

2018-05-24 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488878#comment-16488878
 ] 

Piotr Nowojski commented on FLINK-9349:
---

Because of cancelled 1.5.0 RC5 this fix will make it to 1.5.0 RC6. FYI 
[~tzulitai] you might want to run some additional tests for 1.5.0 RC6 to make 
sure that everything works as expected, because this bug fix was not included 
in normal release testing, and a lot of voters will probably just carry over 
their votes to new RC without testing RC6.

> KafkaConnector Exception  while fetching from multiple kafka topics
> ---
>
> Key: FLINK-9349
> URL: https://issues.apache.org/jira/browse/FLINK-9349
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Vishal Santoshi
>Assignee: Sergey Nuyanzin
>Priority: Critical
> Fix For: 1.5.0, 1.4.3
>
> Attachments: Flink9349Test.java
>
>
> ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
>  
> It seems the List subscribedPartitionStates was being modified when 
> runFetchLoop iterated the List.
> This can happen if, e.g., FlinkKafkaConsumer runs the following code 
> concurrently:
>                 kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
>  
> {code:java}
>  java.util.ConcurrentModificationException
>   at 
> java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966)
>   at java.util.LinkedList$ListItr.next(LinkedList.java:888)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5983: [FLINK-7789][DataStream API] Add handler for Async...

2018-05-24 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5983#discussion_r190567872
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/TimeoutAwareAsyncFunction.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enhanced {@link AsyncFunction} which can handle timeouts.
+ */
+@PublicEvolving
+public interface TimeoutAwareAsyncFunction extends 
AsyncFunction {
+
+   /**
+* asyncInvoke timeout occurred.
+* Here you can complete the result future exceptionally with timeout 
exception,
+* or complete with empty result. You can also retry to complete with 
the right results.
+*
+* @param input element coming from an upstream task
+* @param resultFuture to be completed with the result data
+* @exception Exception in case of a user code error. An exception will 
make the task fail and
+* trigger fail-over process.
+*/
+   void timeout(IN input, ResultFuture resultFuture) throws Exception;
--- End diff --

No problem :) Please CC me if you open next RP for this issue.


---


[jira] [Commented] (FLINK-7789) Add handler for Async IO operator timeouts

2018-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488897#comment-16488897
 ] 

ASF GitHub Bot commented on FLINK-7789:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5983#discussion_r190567872
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/TimeoutAwareAsyncFunction.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enhanced {@link AsyncFunction} which can handle timeouts.
+ */
+@PublicEvolving
+public interface TimeoutAwareAsyncFunction extends 
AsyncFunction {
+
+   /**
+* asyncInvoke timeout occurred.
+* Here you can complete the result future exceptionally with timeout 
exception,
+* or complete with empty result. You can also retry to complete with 
the right results.
+*
+* @param input element coming from an upstream task
+* @param resultFuture to be completed with the result data
+* @exception Exception in case of a user code error. An exception will 
make the task fail and
+* trigger fail-over process.
+*/
+   void timeout(IN input, ResultFuture resultFuture) throws Exception;
--- End diff --

No problem :) Please CC me if you open next RP for this issue.


> Add handler for Async IO operator timeouts 
> ---
>
> Key: FLINK-7789
> URL: https://issues.apache.org/jira/browse/FLINK-7789
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Karthik Deivasigamani
>Assignee: blues zheng
>Priority: Major
>
> Currently Async IO operator does not provide a mechanism to handle timeouts. 
> When a request times out it an exception is thrown and job is restarted. It 
> would be good to pass a AsyncIOTimeoutHandler which can be implemented by the 
> user and passed in the constructor.
> Here is the discussion from apache flink users mailing list 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9430) Support Casting of Object to Primitive types for Flink SQL UDF

2018-05-24 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488902#comment-16488902
 ] 

Timo Walther commented on FLINK-9430:
-

Wouldn't it be easier to just return String in your exmaple and cast to the 
desired type? The use of {{Object}} should be discouraged.

> Support Casting of Object to Primitive types for Flink SQL UDF
> --
>
> Key: FLINK-9430
> URL: https://issues.apache.org/jira/browse/FLINK-9430
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> We want to add a SQL UDF to access specific element in a JSON string using 
> JSON path. However, the JSON element can be of different types, e.g. Int, 
> Float, Double, String, Boolean and etc.. Since return type is not part of the 
> method signature, we can not use overload. So we will end up writing a UDF 
> for each type, e.g. GetFloatFromJSON, GetIntFromJSON and etc., which has a 
> lot of duplication. 
> One way to unify all these UDF functions is to implement one UDF and return 
> java.lang.Object, and in the SQL statement, use CAST AS to cast the returned 
> Object into the correct type. Below is an example:
>  
> {code:java}
> object JsonPathUDF extends ScalarFunction {
>  def eval(jsonStr: String, path: String): Object = {
>JSONParser.parse(jsonStr).read(path)
>  }
> }{code}
> {code:java}
>  SELECT CAST(jsonpath(json, "$.store.book.title") AS VARCHAR(32)) as 
> bookTitle FROM table1{code}
> The current Flink SQL cast implementation does not support casting from 
> GenericTypeInfo to another type, I have already got a local 
> branch to fix this. Please comment if there are alternatives to the problem 
> above.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9430) Support Casting of Object to Primitive types for Flink SQL UDF

2018-05-24 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488902#comment-16488902
 ] 

Timo Walther edited comment on FLINK-9430 at 5/24/18 12:36 PM:
---

Wouldn't it be easier to just return String in your example and cast to the 
desired type? The use of {{Object}} should be discouraged.


was (Author: twalthr):
Wouldn't it be easier to just return String in your exmaple and cast to the 
desired type? The use of {{Object}} should be discouraged.

> Support Casting of Object to Primitive types for Flink SQL UDF
> --
>
> Key: FLINK-9430
> URL: https://issues.apache.org/jira/browse/FLINK-9430
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> We want to add a SQL UDF to access specific element in a JSON string using 
> JSON path. However, the JSON element can be of different types, e.g. Int, 
> Float, Double, String, Boolean and etc.. Since return type is not part of the 
> method signature, we can not use overload. So we will end up writing a UDF 
> for each type, e.g. GetFloatFromJSON, GetIntFromJSON and etc., which has a 
> lot of duplication. 
> One way to unify all these UDF functions is to implement one UDF and return 
> java.lang.Object, and in the SQL statement, use CAST AS to cast the returned 
> Object into the correct type. Below is an example:
>  
> {code:java}
> object JsonPathUDF extends ScalarFunction {
>  def eval(jsonStr: String, path: String): Object = {
>JSONParser.parse(jsonStr).read(path)
>  }
> }{code}
> {code:java}
>  SELECT CAST(jsonpath(json, "$.store.book.title") AS VARCHAR(32)) as 
> bookTitle FROM table1{code}
> The current Flink SQL cast implementation does not support casting from 
> GenericTypeInfo to another type, I have already got a local 
> branch to fix this. Please comment if there are alternatives to the problem 
> above.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9433) SystemProcessingTimeService does not work properly

2018-05-24 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-9433:
-

 Summary: SystemProcessingTimeService does not work properly
 Key: FLINK-9433
 URL: https://issues.apache.org/jira/browse/FLINK-9433
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Ruidong Li
Assignee: Ruidong Li


if  (WindowOperator --> AsyncWaitOperator) chained together, when the queue of 
AsyncWaitOperator is full and timeTrigger of WindowOperator is triggered to 
call collect(), it will wait until the queue of AsyncWaitOperator  is not full, 
at the moment, the timeTrigger of AsyncWaitOperator will not be triggered 
because the SystemProcessingTimeService has only one capacity.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >