[jira] [Created] (FLINK-12308) Support python language in Flink Table API

2019-04-23 Thread sunjincheng (JIRA)
sunjincheng created FLINK-12308:
---

 Summary: Support python language in Flink Table API
 Key: FLINK-12308
 URL: https://issues.apache.org/jira/browse/FLINK-12308
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: 1.9.0
Reporter: sunjincheng
Assignee: sunjincheng


At the Flink API level, we have DataStreamAPI/DataSetAPI/TableAPI, the 
Table API will become the first-class citizen. Table API is declarative, and 
can be automatically optimized, which is mentioned in the Flink mid-term 
roadmap by Stephan. So, first considering supporting Python at the Table level 
to cater to the current large number of analytics users. And Flink's goal for 
Python Table API as follows:
 * Users can write Flink Table API job in Python, and should mirror Java / 
Scala Table API
 * Users can submit Python Table API job in the following ways:
 ** Submit a job with python script, integrate with `flink run`
 ** Submit a job with python script by REST service
 ** Submit a job in an interactive way, similar `scala-shell`
 ** Local debug in IDE.
 * Users can write custom functions(UDF, UDTF, UDAF)
 * Pandas functions can be used in Flink Python Table API

A more detailed description can be found in FLIP-38(Will be done soon).

For the API level, we make the following plan:
 * The short-term:
 We may initially go with a simple approach to map the Python Table API to the 
Java Table API via Py4J.

 * The long-term:
 We may need to create a Python API that follows the same structure as Flink's 
Table API that produces the language-independent DAG. (As Stephan already 
motioned on the [mailing 
thread|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096])



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12307) Support translation from StreamExecWindowJoin to StreamTransformation.

2019-04-23 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-12307:
--

 Summary: Support translation from StreamExecWindowJoin to 
StreamTransformation.
 Key: FLINK-12307
 URL: https://issues.apache.org/jira/browse/FLINK-12307
 Project: Flink
  Issue Type: Task
  Components: Table SQL / API
Reporter: Jing Zhang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Proposal of Flink Driver - Making Flink More Interactive

2019-04-23 Thread Shuiqiang Chen
Hi Till,

Thanks for your insightful and valuable comments!

The introduction of drive dispatcher is the functionality extension of
current existing dispatchers, and has somewhat change to the runtime.  It
is mainly to manage the ephemeral session cluster lifecycle when user
detachedly submits an application which might consist of one or more jobs
without providing a cluster id.

In terms of failover, currently, we will directly transit the whole
application into a globally terminal state if the driver job faults. As
@becket@gmail.com  mentioned in the doc, there
are two possible options to do the failover recovery. One is relying on the
state, the other is to use event logs. Unfortunately, neither of them are
powerful enough to support driver failover at this point. So we decided to
postpone this to future works.

Best,
Shuiqiang

Till Rohrmann  于2019年4月23日周二 下午8:03写道:

> Thanks for proposing this design document Shuiqiang. It is a very
> interesting idea how to solve the problem of running multiple Flink jobs as
> part of a single application. I like the idea since it does not require
> many runtime changes apart from a session concept on the Dispatcher and it
> would work in all environments.
>
> One thing which is not fully clear to me is the failover behavior in case
> of driver job faults. Would it be possible to recover the driver job or
> would a driver job fault directly transition the application into a
> globally terminal state? Apart from that, I left some minor comments in the
> design document.
>
> Cheers,
> Till
>
> On Tue, Apr 23, 2019 at 10:04 AM Shuiqiang Chen 
> wrote:
>
> > Hi All,
> >
> > We would like to start a discussion thread about a new feature called
> Flink
> > Driver. A brief summary is following.
> >
> > As mentioned in the discussion of Interactive Programming, user
> > applications might consist of multiple jobs and take long to finish.
> > Currently, when Flink runs applications with multiple jobs, the
> application
> > will run in a local process which is responsible for submitting the jobs.
> > That local process will not exit until the whole application has
> finished.
> > Users have to keep eyes on the local process in case it is killed due to
> > connection lost, session timeout, local operating system problem, etc.
> >
> > To solve the problem, we would like to introduce the Flink Driver. Users
> > can submit applications using driver mode. A Flink driver job will be
> > submitted to take care of the job submissions in the user application.
> >
> > For more details about flink driver, please refer to the doc:
> >
> >
> https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY
> >
> > Any comments and suggestions will be highly appreciated.
> >
> > Best Regards,
> > Shuiqiang
> >
>


[jira] [Created] (FLINK-12306) Change the name of variable "log" to Upper case "LOG"

2019-04-23 Thread Kejian Li (JIRA)
Kejian Li created FLINK-12306:
-

 Summary: Change the name of variable "log" to Upper case "LOG"
 Key: FLINK-12306
 URL: https://issues.apache.org/jira/browse/FLINK-12306
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.8.0, 1.7.2
Reporter: Kejian Li
 Fix For: 1.9.0, 1.8.1


Change the name of variable "log" from lower case to upper case "LOG" to 
correspond to other class files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12305) Table API Clarification

2019-04-23 Thread Alex Barnes (JIRA)
Alex Barnes created FLINK-12305:
---

 Summary: Table API Clarification
 Key: FLINK-12305
 URL: https://issues.apache.org/jira/browse/FLINK-12305
 Project: Flink
  Issue Type: Improvement
  Components: Project Website
Affects Versions: 1.8.0
Reporter: Alex Barnes


It is not clear from the documentation if late arriving data is correctly 
handled in the Flink Table/SQL APIs. The documentation makes passing reference 
to recognizing late arriving data, but does not go into depth as to what kind 
of triggering/processing can be performed on it 

[https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time]

This old email thread on the apache-flink-users mailing list tells a different 
story - specifically that late arriving data is not supported and DataStream 
APIs need to be used instead:

[http://osdir.com/apache-flink-users/msg08110.html]

Has support been added since that email correspondence? Please consider 
reducing ambiguity in the documentation and update it to better reflect the 
current/planned state of support for late arriving data in the Table API.

Thanks,
Alex



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] FLIP-38 Support python language in flink TableAPI

2019-04-23 Thread Stephan Ewen
Hi all!

Below are my notes on the discussion last week on how to collaborate
between Beam and Flink.
The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei, Shaoxuan,
Jincheng, and me.

This represents my understanding of the discussion, please augment this
where I missed something or where your conclusion was different.

Best,
Stephan

===

*Beams Python and Portability Framework*

  - Portability core to Beam
  - Language independent dataflow DAG that is defined via ProtoBuf
  - DAG can be generated from various languages (Java, Python, Go)
  - The DAG describes the pipelines and contains additional parameters to
describe each operator, and contains artifacts that need to be deployed /
executed as part of an operator execution.
  - Operators execute in language-specific containers, data is exchanged
between the language-specific container and the runner container (JVM) via
gRPC.

*Flink's desiderata for Python API*

  - Python API should mirror Java / Scala Table API
  - All relational expressions that correspond to built-in functions should
be translated to corresponding expressions in the Table API. That way the
planner generated Java code for the data types and built-in expressions,
meaning no Python code is necessary during execution
  - UDFs should be supported and run similarly as in Beam's approach
  - Python programs should be similarly created and submitted/deployed as
Java / Scala programs (CLI, web, containerized, etc.)

*Consensus to share inter-process communication code*

  - Crucial code for robust setup and high performance data exchange across
processes
  - The code for the SDK harness, the artifact boostrapping, and the data
exchange make sense to share.
  - Ongoing discussion whether this can be a dedicated module with slim
dependencies in Beam

*Potential Long Term Perspective: Share language-independent DAG
representation*

  - Beam's language independent DAG could become a standard representation
used in both projects
  - Flink would need an way to receive that DAG, map it to the Table API,
execute it from there
  - The DAG would need to have a standardized representation of functions
and expressions that then get mapped to Table API expressions to let the
planner optimize those and generate Java code for those
  - Similar as UDFs are supported in the Table API, there would be
additional "external UDFs" that would go through the above mentioned
inter-process communication layer

  - *Advantages:*
=> Flink and Beam could share more language bindings
=> Flink would execute Beam portability programs fast, without
intermediate abstraction and directly in the JVM for many operators.
 Abstraction is necessary around UDFs and to bridge between
serializers / coders, etc.

  - *Open issues:*
=> Biggest question is whether the language-independent DAG is
expressive enough to capture all the expressions that we want to map
directly to Table API expressions. Currently much is hidden in opaque UDFs.
Kenn mentioned the structure should be flexible enough to capture more
expressions transparently.

=> If the DAG is generic enough to capture the additional information,
we probably still need some standardization, so that all the different
language APIs represent their expressions the same way
=> Similarly, it makes sense to standardize the type system (and type
inference) as far as built-in expressions and their interaction with UDFs
are concerned. The Flink Table API and Blink teams found this to be
essential for a consistent API behavior. This would not prevent all-UDF
programs from still using purely binary/opaque types.

 =>  We need to create a Python API that follows the same structure as
Flink's Table API that produces the language-independent DAG

*Short-term approach in Flink*

  - Goal is to not block Flink's Python effort on the long term approach
and the necessary design and evolution of the language-independent DAG.
  - Depending on what the outcome of above investigation is, Flink may
initially go with a simple approach to map the Python Table API to the the
Java Table API via Py4J, as outlined in FLIP-38:
https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8



On Tue, Apr 23, 2019 at 4:14 AM jincheng sun 
wrote:

> Hi everyone,
>
> Thank you for all of your feedback and comments in google doc!
>
> I have updated the google doc and add the UDFs part. For a short summary:
>
>   - Python TableAPI - Flink introduces a set of Python Table API Interfaces
> which align with Flink Java Table API. It uses Py4j framework to
> communicate between Python VM  and Java VM.
>   - Python User-defined functions - IMO. Flink supports the communication
> framework of UDFs, we will try to reuse the existing achievements of Beam
> as much as possible, and do our best for this. The first step is
>   to solve the above interface definition problem, which turns `
> WindowedValue` into `T` 

[DISCUSS] Temporarily remove support for job rescaling via CLI action "modify"

2019-04-23 Thread Gary Yao
Hi all,

As the subject states, I am proposing to temporarily remove support for
changing the parallelism of a job via the following syntax [1]:

./bin/flink modify [job-id] -p [new-parallelism]

This is an experimental feature that we introduced with the first rollout of
FLIP-6 (Flink 1.5). However, this feature comes with a few caveats:

* Rescaling does not work with HA enabled [2]
* New parallelism is not persisted, i.e., after a JobManager restart,
the job
  will be recovered with the initial parallelism

Due to the above-mentioned issues, I believe that currently nobody uses
"modify -p" to rescale their jobs in production. Moreover, the rescaling
feature stands in the way of our current efforts to rework Flink's
scheduling
[3]. I therefore propose to remove the rescaling code for the time being.
Note
that it will still be possible to change the parallelism by taking a
savepoint
and restoring the job with a different parallelism [4].

Any comments and suggestions will be highly appreciated.

Best,
Gary

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html
[2] https://issues.apache.org/jira/browse/FLINK-8902
[3] https://issues.apache.org/jira/browse/FLINK-10429
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html#what-happens-when-i-change-the-parallelism-of-my-program-when-restoring


[jira] [Created] (FLINK-12304) AvroInputFormat should support schema evolution

2019-04-23 Thread John (JIRA)
John created FLINK-12304:


 Summary: AvroInputFormat should support schema evolution
 Key: FLINK-12304
 URL: https://issues.apache.org/jira/browse/FLINK-12304
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.8.0
Reporter: John


>From the avro spec:

_A reader of Avro data, whether from an RPC or a file, can always parse that 
data because its schema is provided. But that schema may not be exactly the 
schema that was expected. For example, if the data was written with a different 
version of the software than it is read, then records may have had fields added 
or removed._

The AvroInputFormat should allow the application to supply a reader's schema to 
support cases where data was written with an old version of a schema and needs 
to be read with a newer version.  The reader's schema can have addition fields 
with defaults so that the old schema can be adapted to the new.  The underlying 
avro java library supports schema resolution, so adding support in 
AvroInputFormat should be straight forward.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: HA lock nodes, Checkpoints, and JobGraphs after failure

2019-04-23 Thread Till Rohrmann
It would be awesome to get the DEBUG logs for JobMaster,
ZooKeeper, ZooKeeperCompletedCheckpointStore,
ZooKeeperStateHandleStore, CheckpointCoordinator.

Cheers,
Till

On Tue, Apr 23, 2019 at 2:37 PM Dyana Rose  wrote:

> may take me a bit to get the logs as we're not always in a situation where
> we've got enough hands free to run through the scenarios for a day.
>
> Is that DEBUG JobManager, DEBUG ZooKeeper, or both you'd be interested in?
>
> Thanks,
> Dyana
>
> On Tue, 23 Apr 2019 at 13:23, Till Rohrmann  wrote:
>
> > Hi Dyana,
> >
> > your analysis is almost correct. The only part which is missing is that
> the
> > lock nodes are created as ephemeral nodes. This should ensure that if a
> JM
> > process dies that the lock nodes will get removed by ZooKeeper. It
> depends
> > a bit on ZooKeeper's configuration how long it takes until Zk detects a
> > client connection as lost and then removes the ephemeral nodes. If the
> job
> > should terminate within this time interval, then it could happen that you
> > cannot remove the checkpoint/JobGraph. However, usually the Zookeeper
> > session timeout should be configured to be a couple of seconds.
> >
> > I would actually be interested in better understanding your problem to
> see
> > whether this is still a bug in Flink. Could you maybe share the
> respective
> > logs on DEBUG log level with me? Maybe it would also be possible to run
> the
> > latest version of Flink (1.7.2) to include all possible bug fixes.
> >
> > FYI: The community is currently discussing to reimplement the ZooKeeper
> > based high availability services [1]. One idea is to get rid of the lock
> > nodes by replacing them with transactions on the leader node. This could
> > prevent these kind of bugs in the future.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-10333
> >
> > Cheers,
> > Till
> >
> > On Thu, Apr 18, 2019 at 3:12 PM dyana.rose 
> > wrote:
> >
> > > Flink v1.7.1
> > >
> > > After a Flink reboot we've been seeing some unexpected issues with
> excess
> > > retained checkpoints not being able to be removed from ZooKeeper after
> a
> > > new checkpoint is created.
> > >
> > > I believe I've got my head around the role of ZK and lockNodes in
> > > Checkpointing after going through the code. Could you check my logic on
> > > this and add any insight, especially if I've got it wrong?
> > >
> > > The situation:
> > > 1) Say we run JM1 and JM2 and retain 10 checkpoints and are running in
> HA
> > > with S3 as the backing store.
> > >
> > > 2) JM1 and JM2 start up and each instance of ZooKeeperStateHandleStore
> > has
> > > its own lockNode UUID. JM1 is elected leader.
> > >
> > > 3) We submit a job, that JobGraph lockNode is added to ZK using JM1's
> > > JobGraph lockNode.
> > >
> > > 4) Checkpoints start rolling in, latest 10 are retained in ZK using
> JM1's
> > > checkpoint lockNode. We continue running, and checkpoints are
> > successfully
> > > being created and excess checkpoints removed.
> > >
> > > 5) Both JM1 and JM2 now are rebooted.
> > >
> > > 6) The JobGraph is recovered by the leader, the job restarts from the
> > > latest checkpoint.
> > >
> > > Now after every new checkpoint we see in the ZooKeeper logs:
> > > INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@653] - Got
> > > user-level KeeperException when processing sessionid:0x1047715000d
> > > type:delete cxid:0x210 zxid:0x71091 txntype:-1 reqpath:n/a Error
> > >
> >
> Path:/flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/0057813
> > > Error:KeeperErrorCode = Directory not empty for
> > >
> >
> /flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/005781
> > > with an increasing checkpoint id on each subsequent call.
> > >
> > > When JM1 and JM2 were rebooted the lockNode UUIDs would have rolled,
> > > right? As the old checkpoints were created under the old UUID, the new
> > JMs
> > > will never be able to remove the old retained checkpoints from
> ZooKeeper.
> > >
> > > Is that correct?
> > >
> > > If so, would this also happen with JobGraphs in the following situation
> > > (we saw this just recently where we had a JobGraph for a cancelled job
> > > still in ZK):
> > >
> > > Steps 1 through 3 above, then:
> > > 4) JM1 fails over to JM2, the job keeps running uninterrupted. JM1
> > > restarts.
> > >
> > > 5) some time later while JM2 is still leader we hard cancel the job and
> > > restart the JMs
> > >
> > > In this case JM2 would successfully remove the job from s3, but because
> > > its lockNode is different from JM1 it cannot delete the lock file in
> the
> > > jobgraph folder and so can’t remove the jobgraph. Then Flink restarts
> and
> > > tries to process the JobGraph it has found, but the S3 files have been
> > > deleted.
> > >
> > > Possible related closed issues (fixes went in v1.7.0):
> > > https://issues.apache.org/jira/browse/FLINK-10184 and
> > > https://issues.apache.org/jira/browse/FLINK-10255
> > >
> > > Thanks for any insight,
> 

Apache Flink survey by Ververica

2019-04-23 Thread Robert Metzger
Hi everyone!

Ververica is running a brief survey to understand Apache Flink usage
and the needs of the community. We are hoping that this survey will help
identify common usage patterns, as well as pinpoint what are the most
needed features for Flink.

We'll share a report with a summary of findings at the conclusion of the
survey with the community. All of the responses will remain confidential,
and only aggregate statistics will be shared.

I expect the survey to take about 10 minutes, and all questions are
optional--we appreciate any feedback that you're willing to provide.

As a thank you, respondents will be entering a draw to win a trip
to Flink Forward.

The survey is available here:
https://docs.google.com/forms/d/e/1FAIpQLSdbNS1O-l07aORzKFx6zr3OV13lOyCx79ZRgiC1jYGb57C_hg/viewform

Looking forward to hearing back from you!

Best,
Robert


Re: HA lock nodes, Checkpoints, and JobGraphs after failure

2019-04-23 Thread Dyana Rose
may take me a bit to get the logs as we're not always in a situation where
we've got enough hands free to run through the scenarios for a day.

Is that DEBUG JobManager, DEBUG ZooKeeper, or both you'd be interested in?

Thanks,
Dyana

On Tue, 23 Apr 2019 at 13:23, Till Rohrmann  wrote:

> Hi Dyana,
>
> your analysis is almost correct. The only part which is missing is that the
> lock nodes are created as ephemeral nodes. This should ensure that if a JM
> process dies that the lock nodes will get removed by ZooKeeper. It depends
> a bit on ZooKeeper's configuration how long it takes until Zk detects a
> client connection as lost and then removes the ephemeral nodes. If the job
> should terminate within this time interval, then it could happen that you
> cannot remove the checkpoint/JobGraph. However, usually the Zookeeper
> session timeout should be configured to be a couple of seconds.
>
> I would actually be interested in better understanding your problem to see
> whether this is still a bug in Flink. Could you maybe share the respective
> logs on DEBUG log level with me? Maybe it would also be possible to run the
> latest version of Flink (1.7.2) to include all possible bug fixes.
>
> FYI: The community is currently discussing to reimplement the ZooKeeper
> based high availability services [1]. One idea is to get rid of the lock
> nodes by replacing them with transactions on the leader node. This could
> prevent these kind of bugs in the future.
>
> [1] https://issues.apache.org/jira/browse/FLINK-10333
>
> Cheers,
> Till
>
> On Thu, Apr 18, 2019 at 3:12 PM dyana.rose 
> wrote:
>
> > Flink v1.7.1
> >
> > After a Flink reboot we've been seeing some unexpected issues with excess
> > retained checkpoints not being able to be removed from ZooKeeper after a
> > new checkpoint is created.
> >
> > I believe I've got my head around the role of ZK and lockNodes in
> > Checkpointing after going through the code. Could you check my logic on
> > this and add any insight, especially if I've got it wrong?
> >
> > The situation:
> > 1) Say we run JM1 and JM2 and retain 10 checkpoints and are running in HA
> > with S3 as the backing store.
> >
> > 2) JM1 and JM2 start up and each instance of ZooKeeperStateHandleStore
> has
> > its own lockNode UUID. JM1 is elected leader.
> >
> > 3) We submit a job, that JobGraph lockNode is added to ZK using JM1's
> > JobGraph lockNode.
> >
> > 4) Checkpoints start rolling in, latest 10 are retained in ZK using JM1's
> > checkpoint lockNode. We continue running, and checkpoints are
> successfully
> > being created and excess checkpoints removed.
> >
> > 5) Both JM1 and JM2 now are rebooted.
> >
> > 6) The JobGraph is recovered by the leader, the job restarts from the
> > latest checkpoint.
> >
> > Now after every new checkpoint we see in the ZooKeeper logs:
> > INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@653] - Got
> > user-level KeeperException when processing sessionid:0x1047715000d
> > type:delete cxid:0x210 zxid:0x71091 txntype:-1 reqpath:n/a Error
> >
> Path:/flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/0057813
> > Error:KeeperErrorCode = Directory not empty for
> >
> /flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/005781
> > with an increasing checkpoint id on each subsequent call.
> >
> > When JM1 and JM2 were rebooted the lockNode UUIDs would have rolled,
> > right? As the old checkpoints were created under the old UUID, the new
> JMs
> > will never be able to remove the old retained checkpoints from ZooKeeper.
> >
> > Is that correct?
> >
> > If so, would this also happen with JobGraphs in the following situation
> > (we saw this just recently where we had a JobGraph for a cancelled job
> > still in ZK):
> >
> > Steps 1 through 3 above, then:
> > 4) JM1 fails over to JM2, the job keeps running uninterrupted. JM1
> > restarts.
> >
> > 5) some time later while JM2 is still leader we hard cancel the job and
> > restart the JMs
> >
> > In this case JM2 would successfully remove the job from s3, but because
> > its lockNode is different from JM1 it cannot delete the lock file in the
> > jobgraph folder and so can’t remove the jobgraph. Then Flink restarts and
> > tries to process the JobGraph it has found, but the S3 files have been
> > deleted.
> >
> > Possible related closed issues (fixes went in v1.7.0):
> > https://issues.apache.org/jira/browse/FLINK-10184 and
> > https://issues.apache.org/jira/browse/FLINK-10255
> >
> > Thanks for any insight,
> > Dyana
> >
>


-- 

Dyana Rose
Software Engineer


W: www.salecycle.com 
[image: The 2019 Look Book - Download Now]



Re: HA lock nodes, Checkpoints, and JobGraphs after failure

2019-04-23 Thread Till Rohrmann
Hi Dyana,

your analysis is almost correct. The only part which is missing is that the
lock nodes are created as ephemeral nodes. This should ensure that if a JM
process dies that the lock nodes will get removed by ZooKeeper. It depends
a bit on ZooKeeper's configuration how long it takes until Zk detects a
client connection as lost and then removes the ephemeral nodes. If the job
should terminate within this time interval, then it could happen that you
cannot remove the checkpoint/JobGraph. However, usually the Zookeeper
session timeout should be configured to be a couple of seconds.

I would actually be interested in better understanding your problem to see
whether this is still a bug in Flink. Could you maybe share the respective
logs on DEBUG log level with me? Maybe it would also be possible to run the
latest version of Flink (1.7.2) to include all possible bug fixes.

FYI: The community is currently discussing to reimplement the ZooKeeper
based high availability services [1]. One idea is to get rid of the lock
nodes by replacing them with transactions on the leader node. This could
prevent these kind of bugs in the future.

[1] https://issues.apache.org/jira/browse/FLINK-10333

Cheers,
Till

On Thu, Apr 18, 2019 at 3:12 PM dyana.rose  wrote:

> Flink v1.7.1
>
> After a Flink reboot we've been seeing some unexpected issues with excess
> retained checkpoints not being able to be removed from ZooKeeper after a
> new checkpoint is created.
>
> I believe I've got my head around the role of ZK and lockNodes in
> Checkpointing after going through the code. Could you check my logic on
> this and add any insight, especially if I've got it wrong?
>
> The situation:
> 1) Say we run JM1 and JM2 and retain 10 checkpoints and are running in HA
> with S3 as the backing store.
>
> 2) JM1 and JM2 start up and each instance of ZooKeeperStateHandleStore has
> its own lockNode UUID. JM1 is elected leader.
>
> 3) We submit a job, that JobGraph lockNode is added to ZK using JM1's
> JobGraph lockNode.
>
> 4) Checkpoints start rolling in, latest 10 are retained in ZK using JM1's
> checkpoint lockNode. We continue running, and checkpoints are successfully
> being created and excess checkpoints removed.
>
> 5) Both JM1 and JM2 now are rebooted.
>
> 6) The JobGraph is recovered by the leader, the job restarts from the
> latest checkpoint.
>
> Now after every new checkpoint we see in the ZooKeeper logs:
> INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@653] - Got
> user-level KeeperException when processing sessionid:0x1047715000d
> type:delete cxid:0x210 zxid:0x71091 txntype:-1 reqpath:n/a Error
> Path:/flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/0057813
> Error:KeeperErrorCode = Directory not empty for
> /flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/005781
> with an increasing checkpoint id on each subsequent call.
>
> When JM1 and JM2 were rebooted the lockNode UUIDs would have rolled,
> right? As the old checkpoints were created under the old UUID, the new JMs
> will never be able to remove the old retained checkpoints from ZooKeeper.
>
> Is that correct?
>
> If so, would this also happen with JobGraphs in the following situation
> (we saw this just recently where we had a JobGraph for a cancelled job
> still in ZK):
>
> Steps 1 through 3 above, then:
> 4) JM1 fails over to JM2, the job keeps running uninterrupted. JM1
> restarts.
>
> 5) some time later while JM2 is still leader we hard cancel the job and
> restart the JMs
>
> In this case JM2 would successfully remove the job from s3, but because
> its lockNode is different from JM1 it cannot delete the lock file in the
> jobgraph folder and so can’t remove the jobgraph. Then Flink restarts and
> tries to process the JobGraph it has found, but the S3 files have been
> deleted.
>
> Possible related closed issues (fixes went in v1.7.0):
> https://issues.apache.org/jira/browse/FLINK-10184 and
> https://issues.apache.org/jira/browse/FLINK-10255
>
> Thanks for any insight,
> Dyana
>


Re: [DISCUSS] Proposal of Flink Driver - Making Flink More Interactive

2019-04-23 Thread Till Rohrmann
Thanks for proposing this design document Shuiqiang. It is a very
interesting idea how to solve the problem of running multiple Flink jobs as
part of a single application. I like the idea since it does not require
many runtime changes apart from a session concept on the Dispatcher and it
would work in all environments.

One thing which is not fully clear to me is the failover behavior in case
of driver job faults. Would it be possible to recover the driver job or
would a driver job fault directly transition the application into a
globally terminal state? Apart from that, I left some minor comments in the
design document.

Cheers,
Till

On Tue, Apr 23, 2019 at 10:04 AM Shuiqiang Chen  wrote:

> Hi All,
>
> We would like to start a discussion thread about a new feature called Flink
> Driver. A brief summary is following.
>
> As mentioned in the discussion of Interactive Programming, user
> applications might consist of multiple jobs and take long to finish.
> Currently, when Flink runs applications with multiple jobs, the application
> will run in a local process which is responsible for submitting the jobs.
> That local process will not exit until the whole application has finished.
> Users have to keep eyes on the local process in case it is killed due to
> connection lost, session timeout, local operating system problem, etc.
>
> To solve the problem, we would like to introduce the Flink Driver. Users
> can submit applications using driver mode. A Flink driver job will be
> submitted to take care of the job submissions in the user application.
>
> For more details about flink driver, please refer to the doc:
>
> https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY
>
> Any comments and suggestions will be highly appreciated.
>
> Best Regards,
> Shuiqiang
>


[jira] [Created] (FLINK-12303) Scala 2.12 lambdas does not work in event classes inside streams.

2019-04-23 Thread JIRA
Matěj Novotný created FLINK-12303:
-

 Summary: Scala 2.12 lambdas does not work in event classes inside 
streams.
 Key: FLINK-12303
 URL: https://issues.apache.org/jira/browse/FLINK-12303
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.7.2
 Environment: Scala 2.11/2.12, Oracle Java 1.8.0_172
Reporter: Matěj Novotný


When you use lambdas inside event classes used in streams it does work in Scala 
2.11. It stoped working in Scala 2.12. It does compile but does not process any 
data and does not throw any exception. I don't think that Flink should support 
lambdas inside event classes processed in streams but I think that It should 
not behave like this. I would expect that it will not compile in case I have 
used some not supported field in event class.

 

For more detail check my demonstration repo, please: 
[https://github.com/matej-novotny/flink-lambda-bug]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12302) fix the finalStatus of application when job not finished

2019-04-23 Thread lamber-ken (JIRA)
lamber-ken created FLINK-12302:
--

 Summary: fix the finalStatus of application  when job not finished
 Key: FLINK-12302
 URL: https://issues.apache.org/jira/browse/FLINK-12302
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Affects Versions: 1.8.0
Reporter: lamber-ken
Assignee: lamber-ken
 Fix For: 1.9.0
 Attachments: image-2019-04-23-19-39-48-501.png, 
image-2019-04-23-19-44-49-403.png, image-2019-04-23-19-45-11-168.png, 
image-2019-04-23-19-45-23-896.png, image-2019-04-23-19-45-47-617.png

flink job(flink-1.6.3) failed in per-job yarn cluste mode, the resourcemanager 
of yarn rerun the job.

when the job failed again, the application while finish, but the finalStatus is 
UNDEFINED

!image-2019-04-23-19-45-47-617.png!

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12301) Scala value classes cannot be serialized anymore in case classes in Flink 1.8.0

2019-04-23 Thread Michael (JIRA)
Michael created FLINK-12301:
---

 Summary: Scala value classes cannot be serialized anymore in case 
classes in Flink 1.8.0
 Key: FLINK-12301
 URL: https://issues.apache.org/jira/browse/FLINK-12301
 Project: Flink
  Issue Type: Bug
  Components: API / Scala
Affects Versions: 1.8.0
Reporter: Michael



There is a regression in Flink 1.8.0 compared to 1.7.2: Scala [value 
classes|https://docs.scala-lang.org/overviews/core/value-classes.html] cannot 
be serialized anymore as a case class attribute.

Some short example code:

{code:scala}
package com.example.valueclassissue

import org.apache.flink.streaming.api.scala._

object ValueClassExample extends App {
  val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment

  val measurements = env.fromCollection(Seq(Measurement(1, new 
DegreeCelsius(32.5f
  measurements.print()

  env.execute()
}

class DegreeCelsius(val value: Float) extends AnyVal {
  override def toString: String = s"$value °C"
}

case class Measurement(i: Int, temperature: DegreeCelsius)
{code}

While with Flink 1.7.2 the program outputs _{{3> Measurement(1,32.5 °C)}}_ as 
expected, in Flink 1.8.0 an exception is thrown:

{noformat}
java.io.IOException: Failed to deserialize an element from the source. If you 
are using user-defined serialization (Value and Writable types), check the 
serialization functions.
Serializer is 
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer@466b6f83
at 
org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:158)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
{noformat}

Full log:

{noformat}
2019-04-23T09:33:48.485Z [main] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - class 
com.example.valueclassissue.DegreeCelsius does not contain a setter for field 
value
2019-04-23T09:33:48.487Z [main] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - Class class 
com.example.valueclassissue.DegreeCelsius cannot be used as a POJO type because 
not all fields are valid POJO fields, and must be processed as GenericType. 
Please read the Flink documentation on "Data Types & Serialization" for details 
of the effect on performance.
2019-04-23T09:33:49.594Z [main] INFO 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment - Running job 
on local embedded Flink mini cluster
2019-04-23T09:33:49.616Z [main] INFO 
org.apache.flink.runtime.minicluster.MiniCluster - Starting Flink Mini Cluster
2019-04-23T09:33:49.618Z [main] INFO 
org.apache.flink.runtime.minicluster.MiniCluster - Starting Metrics Registry
2019-04-23T09:33:49.665Z [main] INFO 
org.apache.flink.runtime.metrics.MetricRegistryImpl - No metrics reporter 
configured, no metrics will be exposed/reported.
2019-04-23T09:33:49.665Z [main] INFO 
org.apache.flink.runtime.minicluster.MiniCluster - Starting RPC Service(s)
2019-04-23T09:33:49.896Z [flink-akka.actor.default-dispatcher-3] INFO 
akka.event.slf4j.Slf4jLogger - Slf4jLogger started
2019-04-23T09:33:49.913Z [main] INFO 
org.apache.flink.runtime.minicluster.MiniCluster - Trying to start actor system 
at :0
2019-04-23T09:33:49.952Z [flink-metrics-2] INFO akka.event.slf4j.Slf4jLogger - 
Slf4jLogger started
2019-04-23T09:33:50.001Z [flink-metrics-2] INFO akka.remote.Remoting - Starting 
remoting
2019-04-23T09:33:50.139Z [flink-metrics-2] INFO akka.remote.Remoting - Remoting 
started; listening on addresses :[akka.tcp://flink-metrics@127.0.1.1:36651]
2019-04-23T09:33:50.144Z [main] INFO 
org.apache.flink.runtime.minicluster.MiniCluster - Actor system started at 
akka.tcp://flink-metrics@127.0.1.1:36651
2019-04-23T09:33:50.146Z [main] INFO 
org.apache.flink.runtime.minicluster.MiniCluster - Starting high-availability 
services
2019-04-23T09:33:50.155Z [main] INFO org.apache.flink.runtime.blob.BlobServer - 
Created BLOB server storage directory 
/tmp/blobStore-0e1c3305-1e6e-47a0-885a-8c82bb5ae09f
2019-04-23T09:33:50.158Z [main] INFO org.apache.flink.runtime.blob.BlobServer - 
Started BLOB server at 0.0.0.0:40817 - max concurrent requests: 50 - max 
backlog: 1000
2019-04-23T09:33:50.161Z [main] INFO 
org.apache.flink.runtime.blob.PermanentBlobCache - Created BLOB cache storage 
directory /tmp/blobStore-e3046023-c398-4a42-88f4-20949f7876ce
2019-04-23T09:33:50.162Z [main] INFO 
org.apache.flink.runtime.blob.TransientBlobCache - Created BLOB cache storage 
directory 

[jira] [Created] (FLINK-12300) Unused Import warning with DataStream[Seq[...]]

2019-04-23 Thread Michael (JIRA)
Michael created FLINK-12300:
---

 Summary: Unused Import warning with DataStream[Seq[...]]
 Key: FLINK-12300
 URL: https://issues.apache.org/jira/browse/FLINK-12300
 Project: Flink
  Issue Type: Bug
  Components: API / Scala
Affects Versions: 1.8.0
Reporter: Michael


There is a regression in Flink 1.8.0 compared to 1.7.2 using Scala...

When enabling unused import warnings in *build.sbt*:
{code:java}
scalacOptions ++= Seq("-Xfatal-warnings", "-Ywarn-unused-import")
{code}
...and compiling a file with this content:
{code:java}
package com.example.unusedimportissue

import org.apache.flink.streaming.api.scala._

object ExampleSeqDataStream {
  def doubleElement(numbers: DataStream[Int]): DataStream[Seq[Int]] = {
numbers.map(x => Seq(x, x))
  }
}
{code}
Then with Flink 1.8.0 this compile issue happens (while with Flink 1.7.2 
everything compiles fine):
{noformat}
[error] /path/to/com/example/unusedimportissue/example.scala:1:0: Unused import
[error] package com.example.unusedimportissue
[error] ^
{noformat}
If the result type {{DataStream[Seq[Int]]}} is replaced by {{DataStream[Int]}}, 
then this issue does not occur, so it is related to the {{Seq[...]}} type.

Maybe a Flink macro is generating an import that is not used, resulting in this 
error?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12299) ExecutionConfig#setAutoWatermarkInterval should check param(interval should not less than zero)

2019-04-23 Thread shiwuliang (JIRA)
shiwuliang created FLINK-12299:
--

 Summary: ExecutionConfig#setAutoWatermarkInterval should check 
param(interval should not less than zero)
 Key: FLINK-12299
 URL: https://issues.apache.org/jira/browse/FLINK-12299
 Project: Flink
  Issue Type: Improvement
Reporter: shiwuliang


In any scenario, `autoWatermarkInterval` should not be less than or equal to 
zero.

First of all, this does not correspond to the meaning of 
`autoWatermarkInterval`.

Second, in the case where `autoWatermarkInterval` is less than 0, we will not 
be able to register ourselves in 
`TimestampsAndPeriodicWatermarksOperator#open`, which will result in the water 
level of this stream being kept at the lowest level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-23 Thread Till Rohrmann
I think we should not expose the ClusterClient configuration via the
ExecutionEnvironment (env.getClusterClient().addJobListener) because this
is effectively the same as exposing the JobListener interface directly on
the ExecutionEnvironment. Instead I think it could be possible to provide a
ClusterClient factory which is picked up from the Configuration or some
other mechanism for example. That way it would not need to be exposed via
the ExecutionEnvironment at all.

Cheers,
Till

On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang  wrote:

> >>>  The ExecutionEnvironment is usually used by the user who writes the
> code and this person (I assume) would not be really interested in these
> callbacks.
>
> Usually ExecutionEnvironment is used by the user who write the code, but
> it doesn't needs to be created and configured by this person. e.g. in
> Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just
> use ExecutionEnvironment to write flink program.  You are right that the
> end user would not be interested in these callback, but the third party
> library that integrate with zeppelin would be interested in these callbacks.
>
> >>> In your case, it could be sufficient to offer some hooks for the
> ClusterClient or being able to provide a custom ClusterClient.
>
> Actually in my initial PR (https://github.com/apache/flink/pull/8190), I
> do pass JobListener to ClusterClient and invoke it there.
> But IMHO, ClusterClient is not supposed be a public api for users. Instead
> JobClient is the public api that user should use to control job. So adding
> hooks to ClusterClient directly and provide a custom ClusterClient doesn't
> make sense to me. IIUC, you are suggesting the following approach
>  env.getClusterClient().addJobListener(jobListener)
> but I don't see its benefit compared to this.
>  env.addJobListener(jobListener)
>
> Overall, I think adding hooks is orthogonal with fine grained job
> control. And I agree that we should refactor the flink client component,
> but I don't think it would affect the JobListener interface. What do you
> think ?
>
>
>
>
> Till Rohrmann  于2019年4月18日周四 下午8:57写道:
>
>> Thanks for starting this discussion Jeff. I can see the need for
>> additional hooks for third party integrations.
>>
>> The thing I'm wondering is whether we really need/want to expose a
>> JobListener via the ExecutionEnvironment. The ExecutionEnvironment is
>> usually used by the user who writes the code and this person (I assume)
>> would not be really interested in these callbacks. If he would, then one
>> should rather think about a better programmatic job control where the
>> `ExecutionEnvironment#execute` call returns a `JobClient` instance.
>> Moreover, we would effectively make this part of the public API and every
>> implementation would need to offer it.
>>
>> In your case, it could be sufficient to offer some hooks for the
>> ClusterClient or being able to provide a custom ClusterClient. The
>> ClusterClient is the component responsible for the job submission and
>> retrieval of the job result and, hence, would be able to signal when a job
>> has been submitted or completed.
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 18, 2019 at 8:57 AM vino yang  wrote:
>>
>>> Hi Jeff,
>>>
>>> I personally like this proposal. From the perspective of
>>> programmability, the JobListener can make the third program more
>>> appreciable.
>>>
>>> The scene where I need the listener is the Flink cube engine for Apache
>>> Kylin. In the case, the Flink job program is embedded into the Kylin's
>>> executable context.
>>>
>>> If we could have this listener, it would be easier to integrate with
>>> Kylin.
>>>
>>> Best,
>>> Vino
>>>
>>> Jeff Zhang  于2019年4月18日周四 下午1:30写道:
>>>

 Hi All,

 I created FLINK-12214
  for adding
 JobListener (hook) in flink job lifecycle. Since this is a new public api
 for flink, so I'd like to discuss it more widely in community to get more
 feedback.

 The background and motivation is that I am integrating flink into apache
 zeppelin (which is a notebook in case you
 don't know). And I'd like to capture some job context (like jobId) in the
 lifecycle of flink job (submission, executed, cancelled) so that I can
 manipulate job in more fined grained control (e.g. I can capture the jobId
 when job is submitted, and then associate it with one paragraph, and when
 user click the cancel button, I can call the flink cancel api to cancel
 this job)

 I believe other projects which integrate flink would need similar
 mechanism. I plan to add api addJobListener in
 ExecutionEnvironment/StreamExecutionEnvironment so that user can add
 customized hook in flink job lifecycle.

 Here's draft interface JobListener.

 public interface JobListener {

 void onJobSubmitted(JobID jobId);

 void 

[jira] [Created] (FLINK-12298) Make column functions accept custom

2019-04-23 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-12298:


 Summary: Make column functions accept custom 
 Key: FLINK-12298
 URL: https://issues.apache.org/jira/browse/FLINK-12298
 Project: Flink
  Issue Type: Improvement
Reporter: Dawid Wysakowicz






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[DISCUSS] Proposal of Flink Driver - Making Flink More Interactive

2019-04-23 Thread Shuiqiang Chen
Hi All,

We would like to start a discussion thread about a new feature called Flink
Driver. A brief summary is following.

As mentioned in the discussion of Interactive Programming, user
applications might consist of multiple jobs and take long to finish.
Currently, when Flink runs applications with multiple jobs, the application
will run in a local process which is responsible for submitting the jobs.
That local process will not exit until the whole application has finished.
Users have to keep eyes on the local process in case it is killed due to
connection lost, session timeout, local operating system problem, etc.

To solve the problem, we would like to introduce the Flink Driver. Users
can submit applications using driver mode. A Flink driver job will be
submitted to take care of the job submissions in the user application.

For more details about flink driver, please refer to the doc:
https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY

Any comments and suggestions will be highly appreciated.

Best Regards,
Shuiqiang


Re: Support for ProtoBuf data format in StreamingFileSink

2019-04-23 Thread Driesprong, Fokko
Thanks Kailash for bringing this up. I think this is a good idea. By
passing the ParquetWriter we gain much more flexibility.

I did a small PR on adding the ability to add compression to the Parquet
writer: https://github.com/apache/flink/pull/7547 But I believe this is the
wrong approach. For example, I'd like to tune the Page sizes as well, but
this requires another PR and a lot of code changes, to simply set this due
to the rather complex structure of the builder pattern. Personally, I would
prefer it to just pass a generic ParquetWriter to the BulkWriter. This
makes it much easier, and I don't see the added value of having the
constructor here.

Cheers, Fokko

Op ma 22 apr. 2019 om 19:53 schreef Jakob Homan :

> +1.  Sounds good to me.
> -Jakob
>
> On Mon, Apr 22, 2019 at 9:00 AM Kailash Dayanand
>  wrote:
> >
> > Friendly remainder. Any thoughts on this approach ?
> >
> > On Tue, Apr 9, 2019 at 11:36 AM Kailash Dayanand 
> wrote:
> >
> > > cc'ing few folks who are interested in this discussion.
> > >
> > > On Tue, Apr 9, 2019 at 11:35 AM Kailash Dayanand 
> > > wrote:
> > >
> > >> Hello,
> > >>
> > >> I am looking to contribute a ProtoParquetWriter support which can be
> used
> > >> in Bulk format for the StreamingFileSink api. There has been earlier
> > >> discussions on this in the user mailing list: https://goo.gl/ya2StL
> and
> > >> thought it would be a good addition to have.
> > >>
> > >> For implementation, looking at the current APIs present at
> > >> ProtoParquetWriter with the parguet project (
> http://tinyurl.com/y378be42),
> > >> it looks like there is some different in the interface between Avro
> and
> > >> Proto writes (ProtoParquetWriter does not have a builder class as
> well as
> > >> not interface with Outputfile). Due to this, I was looking at directly
> > >> extending the ParquetWriter within Flink to define the Builder static
> class
> > >> and have newer interfaces. This is needed as the bulk writer takes a
> > >> builder to crate the ParquetWriter in the bulkWriter.Factory. (
> > >> http://tinyurl.com/yyg9cn9b)
> > >>
> > >> Any thoughts if this is a reasonable approach?
> > >>
> > >> Thanks
> > >> Kailash
> > >>
> > >
>


[jira] [Created] (FLINK-12297) We should clean the closure for OutputTags

2019-04-23 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-12297:


 Summary: We should clean the closure for OutputTags
 Key: FLINK-12297
 URL: https://issues.apache.org/jira/browse/FLINK-12297
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Affects Versions: 1.8.0
Reporter: Dawid Wysakowicz
 Fix For: 1.9.0, 1.8.1


Right now we do not invoke closure cleaner on output tags. Therefore such code:

{code}
@Test
public void testFlatSelectSerialization() throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource elements = env.fromElements(1, 2, 3);
OutputTag outputTag = new OutputTag("AAA") {};
CEP.pattern(elements, Pattern.begin("A")).flatSelect(
outputTag,
new PatternFlatTimeoutFunction() {
@Override
public void timeout(
Map> pattern,
long timeoutTimestamp,
Collector out) throws 
Exception {

}
},
new PatternFlatSelectFunction() {
@Override
public void flatSelect(Map> pattern, Collector out) throws Exception {

}
}
);

env.execute();
}
{code}

will fail with {{The implementation of the PatternFlatSelectAdapter is not 
serializable. }} exception



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12296) Data loss silently in RocksDBStateBackend when more than one operator chained in a single task

2019-04-23 Thread Congxian Qiu(klion26) (JIRA)
Congxian Qiu(klion26) created FLINK-12296:
-

 Summary: Data loss silently in RocksDBStateBackend when more than 
one operator chained in a single task 
 Key: FLINK-12296
 URL: https://issues.apache.org/jira/browse/FLINK-12296
 Project: Flink
  Issue Type: Test
  Components: Runtime / State Backends
Reporter: Congxian Qiu(klion26)
Assignee: Congxian Qiu(klion26)


As the mail list said[1], there may be a problem when more than one operator 
chained in a single task, and all the operators have states, this will be data 
loss silently.

 

[1] 
https://app.smartmailcloud.com/web-share/MDkE4DArUT2eoSv86xq772I1HDgMNTVhLEmsnbQ7



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12295) use retract aggregate function instead of regular aggregate function for retractable aggregate in code gen

2019-04-23 Thread godfrey he (JIRA)
godfrey he created FLINK-12295:
--

 Summary: use retract aggregate function instead of regular 
aggregate function for retractable aggregate in code gen
 Key: FLINK-12295
 URL: https://issues.apache.org/jira/browse/FLINK-12295
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Runtime
Reporter: godfrey he
Assignee: godfrey he


after {{FlinkRelMdModifiedMonotonicity}} introduced, an aggregate function 
whose result value is modified increasing or decreasing could ignore retraction 
message. We could choose regular aggregate function instead of retract 
aggregate function for those aggregate functions in code-gen. Currently, this 
requires the regular aggregate function must implements {{retractExpressions}} 
method and do not throw any exception. A better approach is the retractable 
aggregate operator does not call {{retractExpressions}} method for regular 
aggregate function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12294) kafka consumer, data locality

2019-04-23 Thread Sergey (JIRA)
Sergey created FLINK-12294:
--

 Summary: kafka consumer, data locality
 Key: FLINK-12294
 URL: https://issues.apache.org/jira/browse/FLINK-12294
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka, Runtime / Coordination
Reporter: Sergey


Additional flag (with default false value) controlling whether topic partitions 
already grouped by the key. Exclude unnecessary shuffle/resorting operation 
when this parameter set to true. As an example, say we have client's payment 
transaction in a kafka topic. We grouping by clientId (transaction with the 
same clientId goes to one kafka topic partition) and the task is to find max 
transaction per client in sliding windows. In terms of map\reduce there is no 
needs to shuffle data between all topic consumers, may be it`s worth to do 
within each consumer to gain some speedup due to increasing number of executors 
within each partition data. With N messages (in partition) instead of N*ln(N) 
(current realization with shuffle/resorting) it will be just N operations. For 
windows with thousands events - the tenfold gain of execution speed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)