[jira] [Created] (FLINK-7011) Instable Kafka testStartFromKafkaCommitOffsets failures on Travis

2017-06-27 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-7011:
--

 Summary: Instable Kafka testStartFromKafkaCommitOffsets failures 
on Travis
 Key: FLINK-7011
 URL: https://issues.apache.org/jira/browse/FLINK-7011
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, Tests
Affects Versions: 1.3.1, 1.4.0
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai


Example:
https://s3.amazonaws.com/archive.travis-ci.org/jobs/246703474/log.txt?X-Amz-Expires=30&X-Amz-Date=20170627T065647Z&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAJRYRXRSVGNKPKO5A/20170627/us-east-1/s3/aws4_request&X-Amz-SignedHeaders=host&X-Amz-Signature=dbfc90cfc386fef0990325b54ff74ee4d441944687e7fdaa73ce7b0c2b2ec0ea

In general, the test {{testStartFromKafkaCommitOffsets}} implementation is a 
bit of an overkill. Before continuing with the test, it writes some records 
just for the sake of committing offsets to Kafka and waits for some offsets to 
be committed (which leads to the instability), whereas we can do that simply 
using the test base's {{OffsetHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] Backwards compatibility policy.

2017-06-27 Thread Stefan Richter
For many parts of the code, I would agree with Aljoscha. However, I can also 
see notable exceptions, such as maintaining support for the legacy state from 
Flink <=1.1. For example, I think dropping support for this can simplify new 
developments such as fast local recovery or state replication quiet a bit 
because this is a special case that runs through a lot of code from backend to 
JM. So besides this general discussion about a backwards compatible policy, do 
you think it could make sense to start another concrete discussion about if we 
still must or want backwards compatibility to Flink 1.1 in Flink 1.4?
 
> Am 29.05.2017 um 12:08 schrieb Aljoscha Krettek :
> 
> Normally, I’m the first one to suggest removing everything that is not 
> absolutely necessary in order to have a clean code base. On this issue, 
> though, I think we should support restoring from old Savepoints as far back 
> as possible if it does not make the code completely unmaintainable. Some 
> users might jump versions and always forcing them to go though every version 
> from their old version to the current version doesn’t seem feasible and might 
> put off some users.
> 
> So far, I think the burden of supporting restore from 1.1 is still small 
> enough and with each new version the changes between versions become less and 
> less. The changes from 1.2 to the upcoming 1.3 are quite minimal, I think.
> 
> Best,
> Aljoscha
>> On 24. May 2017, at 17:58, Ted Yu  wrote:
>> 
>> bq. about having LTS versions once a year
>> 
>> +1 to the above.
>> 
>> There may be various reasons users don't want to upgrade (after new
>> releases come out). We should give such users enough flexibility on the
>> upgrade path.
>> 
>> Cheers
>> 
>> On Wed, May 24, 2017 at 8:39 AM, Kostas Kloudas >> wrote:
>> 
>>> Hi all,
>>> 
>>> For the proposal of having a third party tool, I agree with Ted.
>>> Maintaining
>>> it is a big and far from trivial effort.
>>> 
>>> Now for the window of backwards compatibility, I would argue that even if
>>> for some users 4 months (1 release) is not enough to bump their Flink
>>> version,
>>> the proposed policy guarantees that there will always be a path from any
>>> old
>>> version to any subsequent one.
>>> 
>>> Finally, for the proposal about having LTS versions once a year, I am not
>>> sure if this will reduce or create more overhead. If I understand the plan
>>> correctly, this would mean that the community will have to maintain
>>> 2 or 3 LTS versions and the last two major ones, right?
>>> 
 On May 22, 2017, at 7:31 PM, Ted Yu  wrote:
 
 For #2, it is difficult to achieve:
 
 a. maintaining savepoint migration is non-trivial and should be reviewed
>>> by
 domain experts
 b. how to certify such third-party tool
 
 Cheers
 
 On Mon, May 22, 2017 at 3:04 AM, 施晓罡  wrote:
 
> Hi all,
> 
> Currently, we work a lot in the maintenance of compatibility.
> There exist much code in runtime to support the migration of savepoints
> (most of which are deprecated), making it hard to focus on the current
> implementation.
> When more versions are released, much more efforts will be needed if we
> try to make these released versions compatible.
> 
> I agree with Tzu-Li that we should provide a method to let users upgrade
> Flink in a reasonable pace.
> But i am against the proposal that we only offer backwards compatibility
> for one previous version.
> According our time-based release model, a major version is released
>>> every
> four month.
> That means, users have to upgrade their versions every 8 months.
>>> Otherwise
> they will have difficulties in the migration of existing savepoints.
> 
> My suggestions include
> 
> (1) We can release Long-Term Support (LTS) versions which are widely
> adopted in other open-source projects.
> LTS versions should be stable and are free of found bugs. Savepoints in
> LTS versions are guaranteed to be back-compatible so that users can
>>> easily
> upgrade to newer LTS versions.
> 
> The releasing of LTS versions is slower than that of major versions
>>> (maybe
> once a year, determined by users’ upgrade frequency).
> Each LTS version will be supported a period of time and typically there
> are no more than three active LTS versions.
> By encouraging users to use LTS versions, we can ease the maintenance of
> released versions (bug fixes, back compatibility, and critical
>>> performance
> improvement).
> 
> (2) We can provide a third-party tool to do the migration of
>>> old-versioned
> savepoints.
> When users upgrade their versions, they can use the provided tool to
> migrate existing savepoints.
> This can help move the code for savepoint migration out of the actual
> codebase,  making code focuses on current implementation.
> 
> What do you think?
> 
> Regards,
> Xi

[jira] [Created] (FLINK-7012) remove user-JAR upload when disposing a savepoint the old way

2017-06-27 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-7012:
--

 Summary: remove user-JAR upload when disposing a savepoint the old 
way
 Key: FLINK-7012
 URL: https://issues.apache.org/jira/browse/FLINK-7012
 Project: Flink
  Issue Type: Bug
  Components: Client, State Backends, Checkpointing
Affects Versions: 1.4.0
Reporter: Nico Kruber
Assignee: Nico Kruber
Priority: Minor


Inside {{CliFrontend#disposeSavepoint()}}, user JAR files are being uploaded to 
the {{BlobServer}} but they are actually not used (also also not cleaned up) in 
the job manager's handling of the {{DisposeSavepoint}} message.

Since removing new savepoints is as simple as deleting files and old savepoints 
have always worked without these user JARs, we should remove the upload to be 
able to make the JAR file upload jobId-dependent for FLINK-6916.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink-shaded pull request #3: [FLINK-7007] Add README.md

2017-06-27 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink-shaded/pull/3

[FLINK-7007] Add README.md



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink-shaded 7007

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink-shaded/pull/3.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3


commit e3e2588cd3f714dba16cfe328eb22f018bf1e1b4
Author: zentol 
Date:   2017-06-27T10:44:23Z

[FLINK-7007] Add README.md




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7013) Add shaded netty dependency

2017-06-27 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-7013:
---

 Summary: Add shaded netty dependency
 Key: FLINK-7013
 URL: https://issues.apache.org/jira/browse/FLINK-7013
 Project: Flink
  Issue Type: Sub-task
  Components: Network
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7014) Expose isDeterministic interface to ScalarFunction and TableFunction

2017-06-27 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-7014:
-

 Summary: Expose isDeterministic interface to ScalarFunction and 
TableFunction
 Key: FLINK-7014
 URL: https://issues.apache.org/jira/browse/FLINK-7014
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Ruidong Li


Currently, the `isDeterministic` method of implementations of `SqlFuntion` are 
always returning true, which cause inappropriate optimization in Calcite, such 
as taking user's stateful UDF as a pure functional procedure. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7015) Separate OperatorConfig from StreamConfig

2017-06-27 Thread Xu Pingyong (JIRA)
Xu Pingyong created FLINK-7015:
--

 Summary: Separate OperatorConfig from StreamConfig
 Key: FLINK-7015
 URL: https://issues.apache.org/jira/browse/FLINK-7015
 Project: Flink
  Issue Type: Improvement
Reporter: Xu Pingyong
Assignee: Xu Pingyong


Now stream config contains  configs not only the batch task needs, but also the 
operator needs, so stream config can see configs of the operator, and operator 
can see configs of the batch task.

We  need to separate operator config from stream config, and they can only see 
configs of themselves.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7016) Move inputFormat to InputFormatVertex from TaskConfig

2017-06-27 Thread Xu Pingyong (JIRA)
Xu Pingyong created FLINK-7016:
--

 Summary: Move inputFormat to InputFormatVertex from TaskConfig
 Key: FLINK-7016
 URL: https://issues.apache.org/jira/browse/FLINK-7016
 Project: Flink
  Issue Type: Improvement
Reporter: Xu Pingyong
Assignee: Xu Pingyong


On batch case, InputFormat is put into TaskConfig, batch task gets it to read 
data and job manager uses it to split splits from TaskConfig.
On streaming case, all configs are put into StreamConfig, but this inputFormat 
is put into TaskConfig.

We can put InputFormat into InputFormatVertex, and batch task still gets 
InputFormat from TaskConfig. It will be clear.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7017) Remove netty usages in flink-tests

2017-06-27 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-7017:
---

 Summary: Remove netty usages in flink-tests
 Key: FLINK-7017
 URL: https://issues.apache.org/jira/browse/FLINK-7017
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.4.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.4.0






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7018) Reconstruct streamgraph to clear interface

2017-06-27 Thread Xu Pingyong (JIRA)
Xu Pingyong created FLINK-7018:
--

 Summary:  Reconstruct streamgraph to clear interface
 Key: FLINK-7018
 URL: https://issues.apache.org/jira/browse/FLINK-7018
 Project: Flink
  Issue Type: Improvement
Reporter: Xu Pingyong
Assignee: Xu Pingyong


StreamGraph not only contains streamNodes and streamEdges, but also contains 
virtual nodes who has nothing to do with the streamGraph. 
Virtual nodes should be converted to streamNodes in   StreamGraphGenerator.class



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink-shaded pull request #4: Add flink-shaded-netty module

2017-06-27 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink-shaded/pull/4

Add flink-shaded-netty module



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink-shaded 7013

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink-shaded/pull/4.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4


commit 85465f2909e40952432ac853e24ba73fa90ff73c
Author: zentol 
Date:   2017-06-27T12:35:52Z

Add flink-shaded-netty module




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7019) Rework parallelism in Gelly algorithms and examples

2017-06-27 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-7019:
-

 Summary: Rework parallelism in Gelly algorithms and examples
 Key: FLINK-7019
 URL: https://issues.apache.org/jira/browse/FLINK-7019
 Project: Flink
  Issue Type: Sub-task
  Components: Gelly
Affects Versions: 1.4.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Minor
 Fix For: 1.4.0


Flink job parallelism is set with {{ExecutionConfig#setParallelism}} or when 
{{-p}} on the command-line. The Gelly algorithms {{JaccardIndex}}, 
{{AdamicAdar}}, {{TriangleListing}}, and {{ClusteringCoefficient}} have 
intermediate operators which generated output quadratic in the size of input. 
These algorithms may need to be run with a high parallelism but doing so for 
all operations is wasteful. Thus was introduced "little parallelism".

This can be simplified by moving the parallelism parameter to the new common 
base class and with the rule-of-thumb to use the algorithm parallelism for all 
normal (small output) operators. The asymptotically large operators will 
default to the job parallelism, as will the default algorithm parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] Backwards compatibility policy.

2017-06-27 Thread Stephan Ewen
I think that this discussion is probably motivated especially by the
"legacy state" handling of Flink 1.1.
The biggest gain in codebase and productivity would be won only by dropping
1.1 compatibility in Flink 1.4.

My gut feeling is that this is reasonable. We support two versions back,
which means that users can skip one upgrade, but not two.

>From what I can tell, users are usually eager to upgrade. They don't do it
immediately, but as soon as the new release is a bit battle tested.

I would expect skipping two entire versions to be rare enough to be okay
with a solution which is a bit more effort for the user:
You can upgrade from Flink 1.1. to 1.4 by loading the 1.1 savepoint into
Flink 1.2, take a savepoint (1.2 format), and resume that in Flink 1.4.

Greetings,
Stephan


On Tue, Jun 27, 2017 at 12:01 PM, Stefan Richter <
s.rich...@data-artisans.com> wrote:

> For many parts of the code, I would agree with Aljoscha. However, I can
> also see notable exceptions, such as maintaining support for the legacy
> state from Flink <=1.1. For example, I think dropping support for this can
> simplify new developments such as fast local recovery or state replication
> quiet a bit because this is a special case that runs through a lot of code
> from backend to JM. So besides this general discussion about a backwards
> compatible policy, do you think it could make sense to start another
> concrete discussion about if we still must or want backwards compatibility
> to Flink 1.1 in Flink 1.4?
>
> > Am 29.05.2017 um 12:08 schrieb Aljoscha Krettek :
> >
> > Normally, I’m the first one to suggest removing everything that is not
> absolutely necessary in order to have a clean code base. On this issue,
> though, I think we should support restoring from old Savepoints as far back
> as possible if it does not make the code completely unmaintainable. Some
> users might jump versions and always forcing them to go though every
> version from their old version to the current version doesn’t seem feasible
> and might put off some users.
> >
> > So far, I think the burden of supporting restore from 1.1 is still small
> enough and with each new version the changes between versions become less
> and less. The changes from 1.2 to the upcoming 1.3 are quite minimal, I
> think.
> >
> > Best,
> > Aljoscha
> >> On 24. May 2017, at 17:58, Ted Yu  wrote:
> >>
> >> bq. about having LTS versions once a year
> >>
> >> +1 to the above.
> >>
> >> There may be various reasons users don't want to upgrade (after new
> >> releases come out). We should give such users enough flexibility on the
> >> upgrade path.
> >>
> >> Cheers
> >>
> >> On Wed, May 24, 2017 at 8:39 AM, Kostas Kloudas <
> k.klou...@data-artisans.com
> >>> wrote:
> >>
> >>> Hi all,
> >>>
> >>> For the proposal of having a third party tool, I agree with Ted.
> >>> Maintaining
> >>> it is a big and far from trivial effort.
> >>>
> >>> Now for the window of backwards compatibility, I would argue that even
> if
> >>> for some users 4 months (1 release) is not enough to bump their Flink
> >>> version,
> >>> the proposed policy guarantees that there will always be a path from
> any
> >>> old
> >>> version to any subsequent one.
> >>>
> >>> Finally, for the proposal about having LTS versions once a year, I am
> not
> >>> sure if this will reduce or create more overhead. If I understand the
> plan
> >>> correctly, this would mean that the community will have to maintain
> >>> 2 or 3 LTS versions and the last two major ones, right?
> >>>
>  On May 22, 2017, at 7:31 PM, Ted Yu  wrote:
> 
>  For #2, it is difficult to achieve:
> 
>  a. maintaining savepoint migration is non-trivial and should be
> reviewed
> >>> by
>  domain experts
>  b. how to certify such third-party tool
> 
>  Cheers
> 
>  On Mon, May 22, 2017 at 3:04 AM, 施晓罡  wrote:
> 
> > Hi all,
> >
> > Currently, we work a lot in the maintenance of compatibility.
> > There exist much code in runtime to support the migration of
> savepoints
> > (most of which are deprecated), making it hard to focus on the
> current
> > implementation.
> > When more versions are released, much more efforts will be needed if
> we
> > try to make these released versions compatible.
> >
> > I agree with Tzu-Li that we should provide a method to let users
> upgrade
> > Flink in a reasonable pace.
> > But i am against the proposal that we only offer backwards
> compatibility
> > for one previous version.
> > According our time-based release model, a major version is released
> >>> every
> > four month.
> > That means, users have to upgrade their versions every 8 months.
> >>> Otherwise
> > they will have difficulties in the migration of existing savepoints.
> >
> > My suggestions include
> >
> > (1) We can release Long-Term Support (LTS) versions which are widely
> > adopted in other open-source projects.
> > LTS vers

[jira] [Created] (FLINK-7020) Upgrade calcite to calcite 1.13

2017-06-27 Thread sunjincheng (JIRA)
sunjincheng created FLINK-7020:
--

 Summary: Upgrade calcite to calcite 1.13
 Key: FLINK-7020
 URL: https://issues.apache.org/jira/browse/FLINK-7020
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Affects Versions: 1.4.0
Reporter: sunjincheng
Assignee: sunjincheng


Calcite 1.13 has released, So I'll try to upgrade the dependence.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink-shaded issue #4: Add flink-shaded-netty module

2017-06-27 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink-shaded/pull/4
  
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink-shaded pull request #3: [FLINK-7007] Add README.md

2017-06-27 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink-shaded/pull/3#discussion_r124335176
  
--- Diff: README.md ---
@@ -0,0 +1,28 @@
+
+
+# Apache Flink Shaded Dependencies
+
+This repository contains a number of shaded dependencies for the [Apache 
Flink](https://flink.apache.org/) project.
+
+The purpose of these dependencies is to provide a single instance of a 
shaded dependency in the Flink distribution, instead of each individual module 
shading the dependency.
+
+The shaded dependencies contained here do not expose any transitive 
dependencies. They may or may not be self-contained.
+
+When using these dependencies it is recommended to work directly against 
the shaded namespaces.
--- End diff --

Could you add the following
```
About

Apache Flink is an open source project of The Apache Software Foundation 
(ASF).
``` 
and link to the ASF?

Other than that +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[DISCUSS] FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime

2017-06-27 Thread Stephan Ewen
Hi all!

I would like to propose the following FLIP:

FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982

The FLIP is motivated by the fact that many users run into an unnecessary
kind of performance problem caused by an old design artifact.

The required change should be reasonably small, and would help many users
and Flink's general standing.

Happy to hear thoughts!

Stephan

==

FLIP text is below. Pictures with illustrations are only in the Wiki, not
supported on the mailing list.
-

Motivation

The default behavior of the streaming runtime is to copy every element
between chained operators.

That operation was introduced for “safety” reasons, to avoid the number of
cases where users can create incorrect programs by reusing mutable objects
(a discouraged pattern, but possible). For example when using state
backends that keep the state as objects on heap, reusing mutable objects
can theoretically create cases where the same object is used in multiple
state mappings.

The effect is that many people that try Flink get much lower performance
than they could possibly get. From empirical evidence, almost all users
that I (Stephan) have been in touch with eventually run into this issue
eventually.

There are multiple observations about that design:


   -

   Object copies are extremely costly. While some simple copy virtually for
   free (types reliably detected as immutable are not copied at all), many
   real pipelines use types like Avro, Thrift, JSON, etc, which are very
   expensive to copy.



   -

   Keyed operations currently only occur after shuffles. The operations are
   hence the first in a pipeline and will never have a reused object anyways.
   That means for the most critical operation, this pre-caution is unnecessary.



   -

   The mode is inconsistent with the contract of the DataSet API, which
   does not copy at each step



   -

   To prevent these copies, users can select {{enableObjectReuse()}}, which
   is misleading, since it does not really reuse mutable objects, but avoids
   additional copies.


Proposal

Summary

I propose to change the default behavior of the DataStream runtime to be
the same as the DataSet runtime. That means that new objects are chosen on
every deserialization, and no copies are made as the objects are passed on
along the pipelines.

Details

I propose to drop the execution config flag {{objectReuse}} and instead
introduce an {{ObjectReuseMode}} enumeration with better control of what
should happen. There will be three different types:


   -

   DEFAULT
   -

  This is the default in the DataSet API
  -

  This will become the default in the DataStream API
  -

  This happens in the DataStream API when {{enableObjectReuse()}} is
  activated.



   -

   COPY_PER_OPERATOR
   -

  The current default in the DataStream API



   -

   FULL_REUSE
   -

  This happens in the DataSet API when {{enableObjectReuse()}} is
  chosen.


An illustration of the modes is as follows:

DEFAULT


See here:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r

COPY_PER_OPERATOR


See here:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks


FULL_REUSE


See here:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuFGinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE
New or Changed Public Interfaces

Interfaces changed

The interface of the {{ExecutionConfig}} add the method
{{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods
{{enableObjectReuse()}} and {{disableObjectReuse()}}.


Behavior changed

The default object passing behavior changes, meaning that it can affect the
correctness of prior DataStream programs that assume the original
“COPY_PER_OPERATOR” behavior.

Migration Plan and Compatibility

Interfaces

No interface migration path is needed, because the interfaces are not
broken, merely some methods get deprecated.

Behavior change

Variant 1:

   -

   Change the behavior, make it explicit on the release notes that we did
   that and what cases are affected.
   -

   This may actually be feasible, because the cases that are affected are
   quite pathological corner cases that only very bad implementations should
   encounter (se

Re: [DISCUSS] FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime

2017-06-27 Thread Greg Hogan
Hi Stephan,

Would this be an appropriate time to discuss allowing reuse to be a 
per-operator configuration? Object reuse for chained operators has lead to 
considerable surprise for some users of the DataSet API. This came up during 
the rework of the object reuse documentation for the DataSet API. With 
annotations a Function could mark whether input/iterator or output/collected 
objects should be copied or reused.

My distant observation is that is is safer to locally assert reuse at the 
operator level than to assume or guarantee the safety of object reuse across an 
entire program. It could also be handy to mix operators receiving copyable 
objects with operators not requiring copyable objects.

Greg


> On Jun 27, 2017, at 1:21 PM, Stephan Ewen  wrote:
> 
> Hi all!
> 
> I would like to propose the following FLIP:
> 
> FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982
> 
> The FLIP is motivated by the fact that many users run into an unnecessary
> kind of performance problem caused by an old design artifact.
> 
> The required change should be reasonably small, and would help many users
> and Flink's general standing.
> 
> Happy to hear thoughts!
> 
> Stephan
> 
> ==
> 
> FLIP text is below. Pictures with illustrations are only in the Wiki, not
> supported on the mailing list.
> -
> 
> Motivation
> 
> The default behavior of the streaming runtime is to copy every element
> between chained operators.
> 
> That operation was introduced for “safety” reasons, to avoid the number of
> cases where users can create incorrect programs by reusing mutable objects
> (a discouraged pattern, but possible). For example when using state
> backends that keep the state as objects on heap, reusing mutable objects
> can theoretically create cases where the same object is used in multiple
> state mappings.
> 
> The effect is that many people that try Flink get much lower performance
> than they could possibly get. From empirical evidence, almost all users
> that I (Stephan) have been in touch with eventually run into this issue
> eventually.
> 
> There are multiple observations about that design:
> 
> 
>   -
> 
>   Object copies are extremely costly. While some simple copy virtually for
>   free (types reliably detected as immutable are not copied at all), many
>   real pipelines use types like Avro, Thrift, JSON, etc, which are very
>   expensive to copy.
> 
> 
> 
>   -
> 
>   Keyed operations currently only occur after shuffles. The operations are
>   hence the first in a pipeline and will never have a reused object anyways.
>   That means for the most critical operation, this pre-caution is unnecessary.
> 
> 
> 
>   -
> 
>   The mode is inconsistent with the contract of the DataSet API, which
>   does not copy at each step
> 
> 
> 
>   -
> 
>   To prevent these copies, users can select {{enableObjectReuse()}}, which
>   is misleading, since it does not really reuse mutable objects, but avoids
>   additional copies.
> 
> 
> Proposal
> 
> Summary
> 
> I propose to change the default behavior of the DataStream runtime to be
> the same as the DataSet runtime. That means that new objects are chosen on
> every deserialization, and no copies are made as the objects are passed on
> along the pipelines.
> 
> Details
> 
> I propose to drop the execution config flag {{objectReuse}} and instead
> introduce an {{ObjectReuseMode}} enumeration with better control of what
> should happen. There will be three different types:
> 
> 
>   -
> 
>   DEFAULT
>   -
> 
>  This is the default in the DataSet API
>  -
> 
>  This will become the default in the DataStream API
>  -
> 
>  This happens in the DataStream API when {{enableObjectReuse()}} is
>  activated.
> 
> 
> 
>   -
> 
>   COPY_PER_OPERATOR
>   -
> 
>  The current default in the DataStream API
> 
> 
> 
>   -
> 
>   FULL_REUSE
>   -
> 
>  This happens in the DataSet API when {{enableObjectReuse()}} is
>  chosen.
> 
> 
> An illustration of the modes is as follows:
> 
> DEFAULT
> 
> 
> See here:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r
> 
> COPY_PER_OPERATOR
> 
> 
> See here:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks
> 
> 
> FULL_REUSE
> 
> 
> See here:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%2FFdOzuuaioooEIOh

Re: [DISCUSS] FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime

2017-06-27 Thread Zhenzhong Xu
Stephan,

Fully supporting this FLIP. We originally encountered pretty big surprises 
observing the object copy behavior causing significant performance degradation 
for our massively parallel use case. 

In our case, (I think most appropriately SHOULD be the assumptions for all 
streaming use case), is to assume object immutability for all the records 
throughout processing pipeline, as a result, I don't see a need to distinguish 
different object reuse behaviors for different (chained) operators (or to the 
very extreme even the need to support COPY_PER_OPERATOR other than we probably 
have to support for backward compatibility). I am also a fan of failing fast if 
user asserts incorrect assumptions.

One feedback on the FLIP-21 itself, I am not very clear on the difference 
between DEFAULT and FULL_REUSE enumeration, aren't them exactly the same thing 
in new proposal? However, the model figures seem to indicate they are slightly 
different? Can you elaborate a bit more?

Z. 


On 2017-06-27 11:14 (-0700), Greg Hogan  wrote: 
> Hi Stephan,
> 
> Would this be an appropriate time to discuss allowing reuse to be a 
> per-operator configuration? Object reuse for chained operators has lead to 
> considerable surprise for some users of the DataSet API. This came up during 
> the rework of the object reuse documentation for the DataSet API. With 
> annotations a Function could mark whether input/iterator or output/collected 
> objects should be copied or reused.
> 
> My distant observation is that is is safer to locally assert reuse at the 
> operator level than to assume or guarantee the safety of object reuse across 
> an entire program. It could also be handy to mix operators receiving copyable 
> objects with operators not requiring copyable objects.
> 
> Greg
> 
> 
> > On Jun 27, 2017, at 1:21 PM, Stephan Ewen  wrote:
> > 
> > Hi all!
> > 
> > I would like to propose the following FLIP:
> > 
> > FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982
> > 
> > The FLIP is motivated by the fact that many users run into an unnecessary
> > kind of performance problem caused by an old design artifact.
> > 
> > The required change should be reasonably small, and would help many users
> > and Flink's general standing.
> > 
> > Happy to hear thoughts!
> > 
> > Stephan
> > 
> > ==
> > 
> > FLIP text is below. Pictures with illustrations are only in the Wiki, not
> > supported on the mailing list.
> > -
> > 
> > Motivation
> > 
> > The default behavior of the streaming runtime is to copy every element
> > between chained operators.
> > 
> > That operation was introduced for “safety” reasons, to avoid the number 
> > of
> > cases where users can create incorrect programs by reusing mutable objects
> > (a discouraged pattern, but possible). For example when using state
> > backends that keep the state as objects on heap, reusing mutable objects
> > can theoretically create cases where the same object is used in multiple
> > state mappings.
> > 
> > The effect is that many people that try Flink get much lower performance
> > than they could possibly get. From empirical evidence, almost all users
> > that I (Stephan) have been in touch with eventually run into this issue
> > eventually.
> > 
> > There are multiple observations about that design:
> > 
> > 
> >   -
> > 
> >   Object copies are extremely costly. While some simple copy virtually for
> >   free (types reliably detected as immutable are not copied at all), many
> >   real pipelines use types like Avro, Thrift, JSON, etc, which are very
> >   expensive to copy.
> > 
> > 
> > 
> >   -
> > 
> >   Keyed operations currently only occur after shuffles. The operations are
> >   hence the first in a pipeline and will never have a reused object anyways.
> >   That means for the most critical operation, this pre-caution is 
> > unnecessary.
> > 
> > 
> > 
> >   -
> > 
> >   The mode is inconsistent with the contract of the DataSet API, which
> >   does not copy at each step
> > 
> > 
> > 
> >   -
> > 
> >   To prevent these copies, users can select {{enableObjectReuse()}}, which
> >   is misleading, since it does not really reuse mutable objects, but avoids
> >   additional copies.
> > 
> > 
> > Proposal
> > 
> > Summary
> > 
> > I propose to change the default behavior of the DataStream runtime to be
> > the same as the DataSet runtime. That means that new objects are chosen on
> > every deserialization, and no copies are made as the objects are passed on
> > along the pipelines.
> > 
> > Details
> > 
> > I propose to drop the execution config flag {{objectReuse}} and instead
> > introduce an {{ObjectReuseMode}} enumeration with better control of what
> > should happen. There will be three different types:
> > 
> > 
> >   -
> > 
> >   DEFAULT
> >

[jira] [Created] (FLINK-7021) Flink Task Manager hangs on startup if one Zookeeper node is unresolvable

2017-06-27 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-7021:
---

 Summary: Flink Task Manager hangs on startup if one Zookeeper node 
is unresolvable
 Key: FLINK-7021
 URL: https://issues.apache.org/jira/browse/FLINK-7021
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.2.1, 1.3.0, 1.2.0
 Environment: Kubernetes cluster running:
* Flink 1.3.0 Job Manager & Task Manager on Java 8u131
* Zookeeper 3.4.10 cluster with 3 nodes
Reporter: Scott Kidder


h2. Problem
Flink Task Manager will hang during startup if one of the Zookeeper nodes in 
the Zookeeper connection string is unresolvable.

h2. Expected Behavior
Flink should retry name resolution & connection to Zookeeper nodes with 
exponential back-off.

h2. Environment Details
We're running Flink and Zookeeper in Kubernetes on CoreOS. CoreOS can run in a 
configuration that automatically detects and applies operating system updates. 
We have a Zookeeper node running on the same CoreOS instance as Flink. It's 
possible that the Zookeeper node will not yet be started when the Flink 
components are started. This could cause hostname resolution of the Zookeeper 
nodes to fail.

h3. Flink Task Manager Logs
{noformat}
2017-06-27 15:38:51,713 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Using configured hostname/address for TaskManager: 10.2.45.11
2017-06-27 15:38:51,714 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Starting TaskManager
2017-06-27 15:38:51,714 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Starting TaskManager actor system at 10.2.45.11:6122.
2017-06-27 15:38:52,950 INFO  akka.event.slf4j.Slf4jLogger  
- Slf4jLogger started
2017-06-27 15:38:53,079 INFO  Remoting  
- Starting remoting
2017-06-27 15:38:53,573 INFO  Remoting  
- Remoting started; listening on addresses 
:[akka.tcp://flink@10.2.45.11:6122]
2017-06-27 15:38:53,576 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Starting TaskManager actor
2017-06-27 15:38:53,660 INFO  
org.apache.flink.runtime.io.network.netty.NettyConfig - NettyConfig 
[server address: /10.2.45.11, server port: 6121, ssl enabled: false, memory 
segment size (bytes): 32768, transport type: NIO, number of server threads: 2 
(manual), number of client threads: 2 (manual), server connect backlog: 0 (use 
Netty's default), client connect timeout (sec): 120, send/receive buffer size 
(bytes): 0 (use Netty's default)]
2017-06-27 15:38:53,682 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have 
a max timeout of 1 ms
2017-06-27 15:38:53,688 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary file 
directory '/tmp': total 49 GB, usable 42 GB (85.71% usable)
2017-06-27 15:38:54,071 INFO  
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 96 MB 
for network buffer pool (number of memory segments: 3095, bytes per segment: 
32768).
2017-06-27 15:38:54,564 INFO  
org.apache.flink.runtime.io.network.NetworkEnvironment- Starting the 
network environment and its components.
2017-06-27 15:38:54,576 INFO  
org.apache.flink.runtime.io.network.netty.NettyClient - Successful 
initialization (took 4 ms).
2017-06-27 15:38:54,677 INFO  
org.apache.flink.runtime.io.network.netty.NettyServer - Successful 
initialization (took 101 ms). Listening on SocketAddress /10.2.45.11:6121.
2017-06-27 15:38:54,981 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerServices - Limiting 
managed memory to 0.7 of the currently free heap space (612 MB), memory will be 
allocated lazily.
2017-06-27 15:38:55,050 INFO  
org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager 
uses directory /tmp/flink-io-ca01554d-f25e-4c17-a828-96d82b43d4a7 for spill 
files.
2017-06-27 15:38:55,061 INFO  org.apache.flink.runtime.metrics.MetricRegistry   
- Configuring StatsDReporter with {interval=10 SECONDS, port=8125, 
host=localhost, class=org.apache.flink.metrics.statsd.StatsDReporter}.
2017-06-27 15:38:55,065 INFO  org.apache.flink.metrics.statsd.StatsDReporter
- Configured StatsDReporter with {host:localhost, port:8125}
2017-06-27 15:38:55,065 INFO  org.apache.flink.runtime.metrics.MetricRegistry   
- Periodically reporting metrics in intervals of 10 SECONDS for 
reporter statsd of type org.apache.flink.metrics.statsd.StatsDReporter.
2017-06-27 15:38:55,175 INFO  org.apache.flink.runtime.filecache.FileCache  
- User file cache uses directory 
/tmp/flink-dist-cache-e4c5bcc5-7513-40d9-a665-0d33c80a36ba
2017-06-27 15:38:55,187 INFO  org.apache.flink.runtime.filecache.FileCache  
- User file cache uses director

[jira] [Created] (FLINK-7022) Flink Job Manager Scheduler & Web Frontend out of sync when Zookeeper is unavailable on startup

2017-06-27 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-7022:
---

 Summary: Flink Job Manager Scheduler & Web Frontend out of sync 
when Zookeeper is unavailable on startup
 Key: FLINK-7022
 URL: https://issues.apache.org/jira/browse/FLINK-7022
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.2.1, 1.3.0, 1.2.0
 Environment: Kubernetes cluster running:
* Flink 1.3.0 Job Manager & Task Manager on Java 8u131
* Zookeeper 3.4.10 cluster with 3 nodes
Reporter: Scott Kidder


h2. Problem
Flink Job Manager web frontend is permanently unavailable if one or more 
Zookeeper nodes are unresolvable during startup. The job scheduler eventually 
recovers and assigns jobs to task managers, but the web frontend continues to 
respond with an HTTP 503 and the following message:
{noformat}Service temporarily unavailable due to an ongoing leader election. 
Please refresh.{noformat}

h2. Expected Behavior
Once Flink is able to interact with Zookeeper successfully, all aspects of the 
Job Manager (job scheduling & the web frontend) should be available.

h2. Environment Details
We're running Flink and Zookeeper in Kubernetes on CoreOS. CoreOS can run in a 
configuration that automatically detects and applies operating system updates. 
We have a Zookeeper node running on the same CoreOS instance as Flink. It's 
possible that the Zookeeper node will not yet be started when the Flink 
components are started. This could cause hostname resolution of the Zookeeper 
nodes to fail.

h3. Flink Task Manager Logs
{noformat}
2017-06-27 15:38:47,161 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: metrics.reporter.statsd.host, localhost
2017-06-27 15:38:47,161 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: metrics.reporter.statsd.port, 8125
2017-06-27 15:38:47,162 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: metrics.reporter.statsd.interval, 10 SECONDS
2017-06-27 15:38:47,254 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: state.backend, filesystem
2017-06-27 15:38:47,254 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: state.backend.fs.checkpointdir, 
hdfs://hdfs:8020/flink/checkpoints
2017-06-27 15:38:47,255 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: state.savepoints.dir, hdfs://hdfs:8020/flink/savepoints
2017-06-27 15:38:47,255 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.mode, zookeeper
2017-06-27 15:38:47,256 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.zookeeper.quorum, 
zookeeper-0.zookeeper:2181,zookeeper-1.zookeeper:2181,zookeeper-2.zookeeper:2181
2017-06-27 15:38:47,256 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.zookeeper.storageDir, 
hdfs://hdfs:8020/flink/recovery
2017-06-27 15:38:47,256 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: recovery.jobmanager.port, 6123
2017-06-27 15:38:47,257 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: blob.server.port, 41479
2017-06-27 15:38:47,357 WARN  org.apache.flink.configuration.Configuration  
- Config uses deprecated configuration key 'recovery.mode' instead 
of proper key 'high-availability'
2017-06-27 15:38:47,366 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Starting JobManager with high-availability
2017-06-27 15:38:47,366 WARN  org.apache.flink.configuration.Configuration  
- Config uses deprecated configuration key 
'recovery.jobmanager.port' instead of proper key 
'high-availability.jobmanager.port'
2017-06-27 15:38:47,452 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Starting JobManager on flink:6123 with execution mode CLUSTER
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.address, flink
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.port, 6123
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.heap.mb, 1024
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.heap.mb, 1024
2017-06-27 15:38:47,549 INFO  
org.apache.flink.configuration.GlobalConfiguration  

[jira] [Created] (FLINK-7023) Remaining types for Gelly ValueArrays

2017-06-27 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-7023:
-

 Summary: Remaining types for Gelly ValueArrays
 Key: FLINK-7023
 URL: https://issues.apache.org/jira/browse/FLINK-7023
 Project: Flink
  Issue Type: Sub-task
  Components: Gelly
Affects Versions: 1.4.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Trivial
 Fix For: 1.4.0


Add implementations of Byte/Char/Double/Float/ShortValueArray. Along with the 
existing implementations of Int/Long/Null/StringValueArray this covers all 10 
CopyableValue types.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7024) Add supported for selecting window proctime/rowtime on row-based Tumble/Slide window

2017-06-27 Thread sunjincheng (JIRA)
sunjincheng created FLINK-7024:
--

 Summary: Add supported for selecting window proctime/rowtime  on 
row-based Tumble/Slide window 
 Key: FLINK-7024
 URL: https://issues.apache.org/jira/browse/FLINK-7024
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.4.0
Reporter: sunjincheng
Assignee: sunjincheng


We get validate exception,when selecting window.proctime/rowtime on row-based 
group window.
{code}
 table
  .window(Tumble over 2.rows on 'proctime as 'w)
  .groupBy('w, 'string)
  .select('string, countFun('string) as 'cnt, 'w.rowtime as 'proctime)
  .window(Over partitionBy 'string orderBy 'proctime preceding 
UNBOUNDED_RANGE following CURRENT_RANGE as 'w2)
  .select('string, 'cnt.sum over 'w2 as 'cnt)
{code}
Exception:
{code}
org.apache.flink.table.api.ValidationException: Window start and Window end 
cannot be selected for a row-count Tumbling window.

at 
org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143)
at 
org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:660)
{code}
We should add window.proctime/rowtime check in `validate ` method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7025) Using NullByteKeySelector for Unbounded ProcTime NonPartitioned Over

2017-06-27 Thread sunjincheng (JIRA)
sunjincheng created FLINK-7025:
--

 Summary: Using NullByteKeySelector for Unbounded ProcTime 
NonPartitioned Over
 Key: FLINK-7025
 URL: https://issues.apache.org/jira/browse/FLINK-7025
 Project: Flink
  Issue Type: Bug
Reporter: sunjincheng
Assignee: sunjincheng


Currently we added `Cleanup State` feature. But It not work well if we enabled 
the stateCleaning on Unbounded ProcTime NonPartitioned Over window, Because in 
`ProcessFunctionWithCleanupState` we has using the keyed state.

So, In this JIRA. I'll change the  `Unbounded ProcTime NonPartitioned Over` to 
`partitioned Over` by using NullByteKeySelector. OR created a 
`NonKeyedProcessFunctionWithCleanupState`. But I think the first way is 
simpler. What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)