Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-07-31 Thread Hequn Cheng
Hi Jincheng,

Thanks a lot for raising the discussion. +1 for the FLIP.

I think this will bring big benefits for the PyFlink users. Currently, the
Python TableAPI document is hidden deeply under the TableAPI&SQL tab which
makes it quite unreadable. Also, the PyFlink documentation is mixed with
Java/Scala documentation. It is hard for users to have an overview of all
the PyFlink documents. As more and more functionalities are added into
PyFlink, I think it's time for us to refactor the document.

Best,
Hequn


On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira 
wrote:

> Hi, Jincheng!
>
> Thanks for creating this detailed FLIP, it will make a big difference in
> the experience of Python developers using Flink. I'm interested in
> contributing to this work, so I'll reach out to you offline!
>
> Also, thanks for sharing some information on the adoption of PyFlink, it's
> great to see that there are already production users.
>
> Marta
>
> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang  wrote:
>
> > Hi Jincheng,
> >
> > Thanks a lot for bringing up this discussion and the proposal.
> >
> > Big +1 for improving the structure of PyFlink doc.
> >
> > It will be very friendly to give PyFlink users a unified entrance to
> learn
> > PyFlink documents.
> >
> > Best,
> > Xingbo
> >
> > Dian Fu  于2020年7月31日周五 上午11:00写道:
> >
> >> Hi Jincheng,
> >>
> >> Thanks a lot for bringing up this discussion and the proposal. +1 to
> >> improve the Python API doc.
> >>
> >> I have received many feedbacks from PyFlink beginners about
> >> the PyFlink doc, e.g. the materials are too few, the Python doc is mixed
> >> with the Java doc and it's not easy to find the docs he wants to know.
> >>
> >> I think it would greatly improve the user experience if we can have one
> >> place which includes most knowledges PyFlink users should know.
> >>
> >> Regards,
> >> Dian
> >>
> >> 在 2020年7月31日,上午10:14,jincheng sun  写道:
> >>
> >> Hi folks,
> >>
> >> Since the release of Flink 1.11, users of PyFlink have continued to
> grow.
> >> As far as I know there are many companies have used PyFlink for data
> >> analysis, operation and maintenance monitoring business has been put
> into
> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).  According to
> >> the feedback we received, current documentation is not very friendly to
> >> PyFlink users. There are two shortcomings:
> >>
> >> - Python related content is mixed in the Java/Scala documentation, which
> >> makes it difficult for users who only focus on PyFlink to read.
> >> - There is already a "Python Table API" section in the Table API
> document
> >> to store PyFlink documents, but the number of articles is small and the
> >> content is fragmented. It is difficult for beginners to learn from it.
> >>
> >> In addition, FLIP-130 introduced the Python DataStream API. Many
> >> documents will be added for those new APIs. In order to increase the
> >> readability and maintainability of the PyFlink document, Wei Zhong and
> me
> >> have discussed offline and would like to rework it via this FLIP.
> >>
> >> We will rework the document around the following three objectives:
> >>
> >> - Add a separate section for Python API under the "Application
> >> Development" section.
> >> - Restructure current Python documentation to a brand new structure to
> >> ensure complete content and friendly to beginners.
> >> - Improve the documents shared by Python/Java/Scala to make it more
> >> friendly to Python users and without affecting Java/Scala users.
> >>
> >> More detail can be found in the FLIP-133:
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation
> >>
> >> Best,
> >> Jincheng
> >>
> >> [1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg
> >> [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g
> >>
> >>
> >>
>


Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 Thread Hequn Cheng
Thanks Dian for the great work and thanks to everyone who makes this
release possible!

Best, Hequn

On Wed, Jul 22, 2020 at 4:40 PM Jark Wu  wrote:

> Congratulations! Thanks Dian for the great work and to be the release
> manager!
>
> Best,
> Jark
>
> On Wed, 22 Jul 2020 at 15:45, Yangze Guo  wrote:
>
> > Congrats!
> >
> > Thanks Dian Fu for being release manager, and everyone involved!
> >
> > Best,
> > Yangze Guo
> >
> > On Wed, Jul 22, 2020 at 3:14 PM Wei Zhong 
> wrote:
> > >
> > > Congratulations! Thanks Dian for the great work!
> > >
> > > Best,
> > > Wei
> > >
> > > > 在 2020年7月22日,15:09,Leonard Xu  写道:
> > > >
> > > > Congratulations!
> > > >
> > > > Thanks Dian Fu for the great work as release manager, and thanks
> > everyone involved!
> > > >
> > > > Best
> > > > Leonard Xu
> > > >
> > > >> 在 2020年7月22日,14:52,Dian Fu  写道:
> > > >>
> > > >> The Apache Flink community is very happy to announce the release of
> > Apache Flink 1.11.1, which is the first bugfix release for the Apache
> Flink
> > 1.11 series.
> > > >>
> > > >> Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> streaming
> > applications.
> > > >>
> > > >> The release is available for download at:
> > > >> https://flink.apache.org/downloads.html
> > > >>
> > > >> Please check out the release blog post for an overview of the
> > improvements for this bugfix release:
> > > >> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
> > > >>
> > > >> The full release notes are available in Jira:
> > > >>
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348323
> > > >>
> > > >> We would like to thank all contributors of the Apache Flink
> community
> > who made this release possible!
> > > >>
> > > >> Regards,
> > > >> Dian
> > > >
> > >
> >
>


Re: [ANNOUNCE] Apache Flink 1.9.3 released

2020-04-25 Thread Hequn Cheng
@Dian, thanks a lot for the release and for being the release manager.
Also thanks to everyone who made this release possible!

Best,
Hequn

On Sat, Apr 25, 2020 at 7:57 PM Dian Fu  wrote:

> Hi everyone,
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.9.3, which is the third bugfix release for the Apache Flink 1.9
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2020/04/24/release-1.9.3.html
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12346867
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
> Also great thanks to @Jincheng for helping finalize this release.
>
> Regards,
> Dian
>


Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

2020-04-07 Thread Hequn Cheng
Thanks a lot for the release and your great job, Gordon!
Also thanks to everyone who made this release possible!

Best,
Hequn

On Tue, Apr 7, 2020 at 8:58 PM Tzu-Li (Gordon) Tai 
wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink Stateful Functions 2.0.0.
>
> Stateful Functions is an API that simplifies building distributed stateful
> applications.
> It's based on functions with persistent state that can interact
> dynamically with strong consistency guarantees.
>
> Please check out the release blog post for an overview of the release:
> https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Stateful Functions can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20statefun
>
> Python SDK for Stateful Functions published to the PyPI index can be found
> at:
> https://pypi.org/project/apache-flink-statefun/
>
> Official Docker image for building Stateful Functions applications is
> currently being published to Docker Hub.
> Dockerfiles for this release can be found at:
> https://github.com/apache/flink-statefun-docker/tree/master/2.0.0
> Progress for creating the Docker Hub repository can be tracked at:
> https://github.com/docker-library/official-images/pull/7749
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346878
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Cheers,
> Gordon
>


Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-20 Thread Hequn Cheng
Congratulations Jingsong! Well deserved.

Best,
Hequn

On Fri, Feb 21, 2020 at 2:42 PM Yang Wang  wrote:

> Congratulations!Jingsong. Well deserved.
>
>
> Best,
> Yang
>
> Zhijiang  于2020年2月21日周五 下午1:18写道:
>
>> Congrats Jingsong! Welcome on board!
>>
>> Best,
>> Zhijiang
>>
>> --
>> From:Zhenghua Gao 
>> Send Time:2020 Feb. 21 (Fri.) 12:49
>> To:godfrey he 
>> Cc:dev ; user 
>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
>>
>> Congrats Jingsong!
>>
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Fri, Feb 21, 2020 at 11:59 AM godfrey he  wrote:
>> Congrats Jingsong! Well deserved.
>>
>> Best,
>> godfrey
>>
>> Jeff Zhang  于2020年2月21日周五 上午11:49写道:
>> Congratulations!Jingsong. You deserve it
>>
>> wenlong.lwl  于2020年2月21日周五 上午11:43写道:
>> Congrats Jingsong!
>>
>> On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:
>>
>> > Congrats Jingsong!
>> >
>> > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
>> > >
>> > > Congratulations Jingsong! Well deserved.
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
>> > >
>> > >> Congratulations! Jingsong
>> > >>
>> > >>
>> > >> Best,
>> > >> Dan Zou
>> > >>
>> >
>> >
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>>
>>


Re: [ANNOUNCE] Apache Flink Python API(PyFlink) 1.9.2 released

2020-02-13 Thread Hequn Cheng
Thanks a lot for the release, Jincheng!
Also thanks to everyone that make this release possible!

Best,
Hequn

On Thu, Feb 13, 2020 at 2:18 PM Dian Fu  wrote:

> Thanks for the great work, Jincheng.
>
> Regards,
> Dian
>
> 在 2020年2月13日,下午1:32,jincheng sun  写道:
>
> Hi everyone,
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink Python API(PyFlink) 1.9.2, which is the first release to PyPI for the
> Apache Flink Python API 1.9 series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
>
> https://pypi.org/project/apache-flink/1.9.2/#files
>
> Or installed using pip command:
>
> pip install apache-flink==1.9.2
>
> We would like to thank all contributors of the Apache Flink community who
> helped to verify this release and made this release possible!
>
> Best,
> Jincheng
>
>
>


Re: [ANNOUNCE] Apache Flink 1.10.0 released

2020-02-12 Thread Hequn Cheng
Great thanks to Yu & Gary for being the release manager!
Also thanks to everyone who made this release possible!

Best, Hequn

On Thu, Feb 13, 2020 at 9:54 AM Rong Rong  wrote:

> Congratulations, a big thanks to the release managers for all the hard
> works!!
>
> --
> Rong
>
> On Wed, Feb 12, 2020 at 5:52 PM Yang Wang  wrote:
>
>> Excellent work. Thanks Gary & Yu for being the release manager.
>>
>>
>> Best,
>> Yang
>>
>> Jeff Zhang  于2020年2月13日周四 上午9:36写道:
>>
>>> Congratulations! Really appreciated your hard work.
>>>
>>> Yangze Guo  于2020年2月13日周四 上午9:29写道:
>>>
 Thanks, Gary & Yu. Congrats to everyone involved!

 Best,
 Yangze Guo

 On Thu, Feb 13, 2020 at 9:23 AM Jingsong Li 
 wrote:
 >
 > Congratulations! Great work.
 >
 > Best,
 > Jingsong Lee
 >
 > On Wed, Feb 12, 2020 at 11:05 PM Leonard Xu 
 wrote:
 >>
 >> Great news!
 >> Thanks everyone involved !
 >> Thanks Gary and Yu for being the release manager !
 >>
 >> Best,
 >> Leonard Xu
 >>
 >> 在 2020年2月12日,23:02,Stephan Ewen  写道:
 >>
 >> Congrats to us all.
 >>
 >> A big piece of work, nicely done.
 >>
 >> Let's hope that this helps our users make their existing use cases
 easier and also opens up new use cases.
 >>
 >> On Wed, Feb 12, 2020 at 3:31 PM 张光辉  wrote:
 >>>
 >>> Greet work.
 >>>
 >>> Congxian Qiu  于2020年2月12日周三 下午10:11写道:
 
  Great work.
  Thanks everyone involved.
  Thanks Gary and Yu for being the release manager
 
 
  Best,
  Congxian
 
 
  Jark Wu  于2020年2月12日周三 下午9:46写道:
 >
 > Congratulations to everyone involved!
 > Great thanks to Yu & Gary for being the release manager!
 >
 > Best,
 > Jark
 >
 > On Wed, 12 Feb 2020 at 21:42, Zhu Zhu  wrote:
 >>
 >> Cheers!
 >> Thanks Gary and Yu for the great job as release managers.
 >> And thanks to everyone whose contribution makes the release
 possible!
 >>
 >> Thanks,
 >> Zhu Zhu
 >>
 >> Wyatt Chun  于2020年2月12日周三 下午9:36写道:
 >>>
 >>> Sounds great. Congrats & Thanks!
 >>>
 >>> On Wed, Feb 12, 2020 at 9:31 PM Yu Li  wrote:
 
  The Apache Flink community is very happy to announce the
 release of Apache Flink 1.10.0, which is the latest major release.
 
  Apache Flink® is an open-source stream processing framework
 for distributed, high-performing, always-available, and accurate data
 streaming applications.
 
  The release is available for download at:
  https://flink.apache.org/downloads.html
 
  Please check out the release blog post for an overview of the
 improvements for this new major release:
  https://flink.apache.org/news/2020/02/11/release-1.10.0.html
 
  The full release notes are available in Jira:
 
 https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12345845
 
  We would like to thank all contributors of the Apache Flink
 community who made this release possible!
 
  Cheers,
  Gary & Yu
 >>
 >>
 >
 >
 > --
 > Best, Jingsong Lee

>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>


Re: [VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #1

2020-02-11 Thread Hequn Cheng
+1 (non-binding)

- Check signature and checksum.
- Install package successfully with Pip under Python 3.7.4.
- Run wordcount example successfully under Python 3.7.4.

Best, Hequn

On Tue, Feb 11, 2020 at 12:17 PM Dian Fu  wrote:

> +1 (non-binding)
>
> - Verified the signature and checksum
> - Pip installed the package successfully: pip install
> apache-flink-1.9.2.tar.gz
> - Run word count example successfully.
>
> Regards,
> Dian
>
> 在 2020年2月11日,上午11:44,jincheng sun  写道:
>
>
> +1 (binding)
>
> - Install the PyFlink by `pip install` [SUCCESS]
> - Run word_count in both command line and IDE [SUCCESS]
>
> Best,
> Jincheng
>
>
>
> Wei Zhong  于2020年2月11日周二 上午11:17写道:
>
>> Hi,
>>
>> Thanks for driving this, Jincheng.
>>
>> +1 (non-binding)
>>
>> - Verified signatures and checksums.
>> - Verified README.md and setup.py.
>> - Run `pip install apache-flink-1.9.2.tar.gz` in Python 2.7.15 and Python
>> 3.7.5 successfully.
>> - Start local pyflink shell in Python 2.7.15 and Python 3.7.5 via
>> `pyflink-shell.sh local` and try the examples in the help message, run well
>> and no exception.
>> - Try a word count example in IDE with Python 2.7.15 and Python 3.7.5,
>> run well and no exception.
>>
>> Best,
>> Wei
>>
>>
>> 在 2020年2月10日,19:12,jincheng sun  写道:
>>
>> Hi everyone,
>>
>> Please review and vote on the release candidate #1 for the PyFlink
>> version 1.9.2, as follows:
>>
>> [ ] +1, Approve the release
>> [ ] -1, Do not approve the release (please provide specific comments)
>>
>> The complete staging area is available for your review, which includes:
>>
>> * the official Apache binary convenience releases to be deployed to
>> dist.apache.org [1], which are signed with the key with fingerprint
>> 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2] and built from source code [3].
>>
>> The vote will be open for at least 72 hours. It is adopted by majority
>> approval, with at least 3 PMC affirmative votes.
>>
>> Thanks,
>> Jincheng
>>
>> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc1/
>> [2] https://dist.apache.org/repos/dist/release/flink/KEYS
>> [3] https://github.com/apache/flink/tree/release-1.9.2
>>
>>
>


Re: [DISCUSS] Upload the Flink Python API 1.9.x to PyPI for user convenience.

2020-02-03 Thread Hequn Cheng
Hi Jincheng,

+1 for this proposal.
>From the perspective of users, I think it would nice to have PyFlink on
PyPI which makes it much easier to install PyFlink.

Best, Hequn

On Tue, Feb 4, 2020 at 1:09 PM Jeff Zhang  wrote:

> +1
>
>
> Xingbo Huang  于2020年2月4日周二 下午1:07写道:
>
>> Hi Jincheng,
>>
>> Thanks for driving this.
>> +1 for this proposal.
>>
>> Compared to building from source, downloading directly from PyPI will
>> greatly save the development cost of Python users.
>>
>> Best,
>> Xingbo
>>
>>
>>
>> Wei Zhong  于2020年2月4日周二 下午12:43写道:
>>
>>> Hi Jincheng,
>>>
>>> Thanks for bring up this discussion!
>>>
>>> +1 for this proposal. Building from source takes long time and requires
>>> a good network environment. Some users may not have such an environment.
>>> Uploading to PyPI will greatly improve the user experience.
>>>
>>> Best,
>>> Wei
>>>
>>> jincheng sun  于2020年2月4日周二 上午11:49写道:
>>>
 Hi folks,

 I am very happy to receive some user inquiries about the use of Flink
 Python API (PyFlink) recently. One of the more common questions is
 whether
 it is possible to install PyFlink without using source code build. The
 most
 convenient and natural way for users is to use `pip install
 apache-flink`.
 We originally planned to support the use of `pip install apache-flink`
 in
 Flink 1.10, but the reason for this decision was that when Flink 1.9 was
 released at August 22, 2019[1], Flink's PyPI account system was not
 ready.
 At present, our PyPI account is available at October 09, 2019 [2](Only
 PMC
 can access), So for the convenience of users I propose:

 - Publish the latest release version (1.9.2) of Flink 1.9 to PyPI.
 - Update Flink 1.9 documentation to add support for `pip install`.

 As we all know, Flink 1.9.2 was just completed released at January 31,
 2020
 [3]. There is still at least 1 to 2 months before the release of 1.9.3,
 so
 my proposal is completely considered from the perspective of user
 convenience. Although the proposed work is not large, we have not set a
 precedent for independent release of the Flink Python API(PyFlink) in
 the
 previous release process. I hereby initiate the current discussion and
 look
 forward to your feedback!

 Best,
 Jincheng

 [1]

 https://lists.apache.org/thread.html/4a4d23c449f26b66bc58c71cc1a5c6079c79b5049c6c6744224c5f46%40%3Cdev.flink.apache.org%3E
 [2]

 https://lists.apache.org/thread.html/8273a5e8834b788d8ae552a5e177b69e04e96c0446bb90979444deee%40%3Cprivate.flink.apache.org%3E
 [3]

 https://lists.apache.org/thread.html/ra27644a4e111476b6041e8969def4322f47d5e0aae8da3ef30cd2926%40%3Cdev.flink.apache.org%3E

>>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: [ANNOUNCE] Apache Flink 1.9.2 released

2020-01-31 Thread Hequn Cheng
@jincheng sun  That's great. Thank you!

On Fri, Jan 31, 2020 at 7:57 PM jincheng sun 
wrote:

> Thanks for being the release manager and the great work Hequn :)
>
> Also thanks to the community making this release possible!
>
> BTW: I have add the 1.9.2 release to report.
>
> Best,
> Jincheng
>
> Hequn Cheng  于2020年1月31日周五 下午6:55写道:
>
>> Hi everyone,
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.9.2, which is the second bugfix release for the Apache Flink
>> 1.9 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2020/01/30/release-1.9.2.html
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/projects/FLINK/versions/12346278
>>
>> We would like to thank all contributors of the Apache Flink community who
>> helped to verify this release and made this release possible!
>> Great thanks to @Jincheng for helping finalize this release.
>>
>> Regards,
>> Hequn
>>
>>


[ANNOUNCE] Apache Flink 1.9.2 released

2020-01-31 Thread Hequn Cheng
Hi everyone,

The Apache Flink community is very happy to announce the release of Apache
Flink 1.9.2, which is the second bugfix release for the Apache Flink 1.9
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/01/30/release-1.9.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12346278

We would like to thank all contributors of the Apache Flink community who
helped to verify this release and made this release possible!
Great thanks to @Jincheng for helping finalize this release.

Regards,
Hequn


Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 Thread Hequn Cheng
Congratulations, Dian.
Well deserved!

Best, Hequn

On Thu, Jan 16, 2020 at 6:08 PM Leonard Xu  wrote:

> Congratulations!  Dian Fu
>
> Best,
> Leonard
>
> 在 2020年1月16日,18:00,Jeff Zhang  写道:
>
> Congrats Dian Fu !
>
> jincheng sun  于2020年1月16日周四 下午5:58写道:
>
>> Hi everyone,
>>
>> I'm very happy to announce that Dian accepted the offer of the Flink PMC
>> to become a committer of the Flink project.
>>
>> Dian Fu has been contributing to Flink for many years. Dian Fu played an
>> essential role in PyFlink/CEP/SQL/Table API modules. Dian Fu has
>> contributed several major features, reported and fixed many bugs, spent a
>> lot of time reviewing pull requests and also frequently helping out on the
>> user mailing lists and check/vote the release.
>>
>> Please join in me congratulating Dian for becoming a Flink committer !
>>
>> Best,
>> Jincheng(on behalf of the Flink PMC)
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>


Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-04 Thread Hequn Cheng
+1 to make blink planner as the default planner for SQL Client, hence we
can give the blink planner a bit more exposure.

Best, Hequn

On Fri, Jan 3, 2020 at 6:32 PM Jark Wu  wrote:

> Hi Benoît,
>
> Thanks for the reminder. I will look into the issue and hopefully we can
> target it into 1.9.2 and 1.10.
>
> Cheers,
> Jark
>
> On Fri, 3 Jan 2020 at 18:21, Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> >  If anyone finds that blink planner has any significant defects and
>> has a larger regression than the old planner, please let us know.
>>
>> Overall, the Blink-exclusive features are must (TopN, deduplicate,
>> LAST_VALUE, plan reuse, etc)! But all use cases of the Legacy planner in
>> production are not covered:
>> An edge case of Temporal Table Functions does not allow computed Tables
>> (as opposed to TableSources) to be used on the query side in Blink (
>> https://issues.apache.org/jira/browse/FLINK-14200)
>>
>> Cheers
>> Ben
>>
>>
>> On Fri, Jan 3, 2020 at 10:00 AM Jeff Zhang  wrote:
>>
>>> +1, I have already made blink as the default planner of flink
>>> interpreter in Zeppelin
>>>
>>>
>>> Jingsong Li  于2020年1月3日周五 下午4:37写道:
>>>
 Hi Jark,

 +1 for default blink planner in SQL-CLI.
 I believe this new planner can be put into practice in production.
 We've worked hard for nearly a year, but the old planner didn't move on.

 And I'd like to cc to user@flink.apache.org.
 If anyone finds that blink planner has any significant defects and has
 a larger regression than the old planner, please let us know. We will be
 very grateful.

 Best,
 Jingsong Lee

 On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu  wrote:

> +1 for this.
> We bring many SQL/API features and enhance stability in 1.10 release,
> and almost all of them happens in Blink planner.
> SQL CLI is the most convenient entrypoint for me, I believe many users
> will have a better experience If we set Blink planner as default planner.
>
> Best,
> Leonard
>
> > 在 2020年1月3日,15:16,Terry Wang  写道:
> >
> > Since what blink planner can do is a superset of flink planner, big
> +1 for changing the default planner to Blink planner from my side.
> >
> > Best,
> > Terry Wang
> >
> >
> >
> >> 2020年1月3日 15:00,Jark Wu  写道:
> >>
> >> Hi everyone,
> >>
> >> In 1.10 release, Flink SQL supports many awesome features and
> improvements,
> >> including:
> >> - support watermark statement and computed column in DDL
> >> - fully support all data types in Hive
> >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
> >> - support INSERT OVERWRITE and INSERT PARTITION
> >>
> >> However, all the features and improvements are only avaiable in
> Blink
> >> planner, not in Old planner.
> >> There are also some other features are limited in Blink planner,
> e.g.
> >> Dimension Table Join [1],
> >> TopN [2], Deduplicate [3], streaming aggregates optimization [4],
> and so on.
> >>
> >> But Old planner is still the default planner in Table API & SQL. It
> is
> >> frustrating for users to set
> >> to blink planner manually when every time start a SQL CLI. And it's
> >> surprising to see unsupported
> >> exception if they trying out the new features but not switch
> planner.
> >>
> >> SQL CLI is a very important entrypoint for trying out new feautures
> and
> >> prototyping for users.
> >> In order to give new planner more exposures, I would like to
> suggest to set
> >> default planner
> >> for SQL Client to Blink planner before 1.10 release.
> >>
> >> The approach is just changing the default SQL CLI yaml
> configuration[5]. In
> >> this way, the existing
> >> environment is still compatible and unaffected.
> >>
> >> Changing the default planner for the whole Table API & SQL is
> another topic
> >> and is out of scope of this discussion.
> >>
> >> What do you think?
> >>
> >> Best,
> >> Jark
> >>
> >> [1]:
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
> >> [2]:
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#top-n
> >> [3]:
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
> >> [4]:
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html
> >> [5]:
> >>
> https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/conf/sql-client-defaults.yaml#L100
> >
>
>

 --
 Best, Jingsong Lee

>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>>
>> --
>> Benoît Paris
>> Ingénieur Machine Learning E

[ANNOUNCE] Weekly Community Update 2020/01

2020-01-04 Thread Hequn Cheng
Dear community,

Happy new year! Wishing you a new year rich with the blessings of love,
joy, warmth, and laughter. Wish Flink will get better and better.

Nice to share this week’s community digest with an update on Flink-1.10.0,
a proposal to set blink planner as the default planner for SQL Client, a
FLIP to support SQL Client Gateway and quite a few other topics. Enjoy.

Flink Development
==

* [release] Yu shared a status update for the release of Flink-1.10. Thanks
to all the efforts of the community, 46 blocker, and 11 critical issues
have been resolved since we cut the release-1.10 branch, and now we are
left with only 12 blocker issues. Check it out for more details. [1]

* [build] A couple of weeks ago Till had started a discussion on dropping
vendor-specific repositories from pom.xml. It received consensus this week
and the PR to remove the vendor-specific repositories has been merged
now.[2]

* [sql] ForwardXu has started a vote on FLIP-90 for supporting SQL
2016-2017 JSON functions in Flink SQL.[3] No votes so far, but maybe this
will change soon since the related discussion thread[4] received no big
concerns. [3][4]

* [sql] Jark has started a vote on rename terminology "Time-windowed Join"
to "Interval Join" in Table API & SQL. Two supporting comments so far. [5]

* [sql] Jark has proposed to set Blink planner as the default planner for
SQL Client in release 1.10. For the Table API, the default planner will
still be the flink planner to give the planner a bit more exposure and
target the changing of a default planner for 1.11. Theoretically, blink
planner is a superset of flink planner, but there are maybe some corner
features that have not been covered. Report it in the thread if you find
such a feature. [6]

* [sql-client] godfreyhe brought up a discussion on FLIP-91 to support SQL
Client Gateway. The goal of FLIP-91 is to extend FLIP-24 to support gateway
mode and REST/JDBC interfaces for SQL Client that could make it easier for
users to use Flink. One supporting comment so far. [7]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Progress-of-Apache-Flink-1-10-3-td36359.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Drop-vendor-specific-repositories-from-pom-xml-td36113.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-90-Support-SQL-2016-2017-JSON-functions-in-Flink-SQL-td36341.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-JSON-functions-in-Flink-SQL-td32674.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Rename-terminology-quot-Time-windowed-Join-quot-to-quot-Interval-Join-quot-in-Table-API-amp-SQL-td36370.html
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Set-default-planner-for-SQL-Client-to-Blink-planner-in-1-10-release-td36379.html
[7]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-91-Support-SQL-Client-Gateway-td36349.html

Notable Bugs
==

[FLINK-15477] [1.9.1] Create BatchTableEnvironment failed. [8]
[FLINK-15467] [1.9.1] Should wait for the end of the source thread during
the Task cancellation. [9]
[FLINK-15466] [1.8.3][1.9.1] `FlinkAggregateExpandDistinctAggregatesRule`
generates wrong plan for cases that have distinct aggs with filter. [10]
[FLINK-15463] [1.9.1] export table sql jar ,meet error:
java.lang.ClassNotFoundException:
org.apache.flink.calcite.shaded.com.google.protobuf.MessageOrBuilder. [11]
[FLINK-15452] [1.9.1] Missing type coercion when generating code with
ScalarFunctionCallGen. [12]

[8] https://issues.apache.org/jira/browse/FLINK-15477
[9] https://issues.apache.org/jira/browse/FLINK-15467
[10] https://issues.apache.org/jira/browse/FLINK-15466
[11] https://issues.apache.org/jira/browse/FLINK-15463
[12] https://issues.apache.org/jira/browse/FLINK-15452

Events, Blog Posts, Misc
===

* Upcoming Meetups
  * On January 8th Christophe will talk about how to build data platform
with Kafka and Flink at the Kubernetes and Cloud Native Meetup in Tunis.
[13]
  * On January 18th Preetdeep Kumar will share some basic Flink DataStream
processing API, followed by a hands-on demo. This will be an online event.
Check more details within the meetup link. [14]

[13]
https://www.meetup.com/Kubernetes-Cloud-Native-Tunisia/events/267396358/
[14]
https://www.meetup.com/Hyderabad-Apache-Flink-Meetup-Group/events/267610014/

Cheers,
Hequn


[ANNOUNCE] Weekly Community Update 2019/52

2019-12-29 Thread Hequn Cheng
Dear community,

Happy to share a short community update this week. Due to the holiday, the
dev@ mailing list is pretty quiet these days.

Flink Development
==

* [sql] Jark proposes to correct the terminology of "Time-windowed Join" to
"Interval Join" in Table API & SQL before 1.10 is released. In this way,
the terminology is aligned between Table API and DataStream. [1]

* [sql] The discussion of Flip-90(Support SQL 2016-2017 JSON functions in
Flink SQL) received some minor feedback from the community. The feedback is
mainly about aligning Table API and SQL, for example, also add ON ERROR and
ON EMPTY clause support for Table API. [2]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Correct-the-terminology-of-quot-Time-windowed-Join-quot-to-quot-Interval-Join-quot-in-Table-L-td36202.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-JSON-functions-in-Flink-SQL-td32674.html

Notable Bugs
==

[FLINK-15406] [1.9.1] The savepoint is written by "State Processor API"
can't be restored by map or flatmap. [3]
[FLINK-15421] [1.9.1] [1.10.0] GroupAggsHandler throws
java.time.LocalDateTime cannot be cast to java.sql.Timestamp. [4]

[3] https://issues.apache.org/jira/browse/FLINK-15406
[4] https://issues.apache.org/jira/browse/FLINK-15421

Events, Blog Posts, Misc
===

* Upcoming Meetups
  * On January 8th, Christophe will talk about how to build data platform
with Kafka and Flink at the Kubernetes and Cloud Native Meetup in Tunis. [5]

[5] https://www.meetup.com/Kubernetes-Cloud-Native-Tunisia/events/267396358/

Cheers,
Hequn


[ANNOUNCE] Weekly Community Update 2019/51

2019-12-22 Thread Hequn Cheng
Dear community,

Happy to share this week's brief community digest with updates on Flink
1.10 and Flink 1.9.2, a proposal to integrate Flink Docker image
publication into Flink release process, a discussion on new features of
PyFlink and a couple of blog posts. Enjoy.

Flink Development
==

* [releases] Kostas Kloudas suggests to focus a little bit on documenting
the new features that the community added to release-1.10 during the
feature-freeze phrase. He has created an umbrella issue(FLINK-15273) to
monitor the pending documentation tasks.[1]

* [releases] Hequn has started a conversation about the release of Flink
1.9.2. One blocker has been addressed this week but a new one is reported.
Considering the ongoing release-1.10 and the limited resources of the
community, the process of 1.9.2 is planned to slow down. [2]

* [releases] Patrick proposes to integrate Flink Docker image publication
into the Flink release process. There are some discussions on whether to
have a dedicated git repo for the Dockerfiles. [3]

* [sq] The discussion on supporting JSON functions in Flink SQL seems to
have reached an agreement. Jark Wu suggested Forward Xu to start a vote. [4]

* [runtime] Stephan raised a discussion and gives some feedback after
trying out the new FLIP-49 memory configurations. He gives some
alternatives on config key names and descriptions. The feedback received
many +1 from other ones. [5]

* [connectors] Some new updates for the discussion on Flip-27, the new
source interface. This has been a log ongoing topic. This week the
discussions are focused on the concept of BOUNDED AND UNBOUNDED for the
source.  [6]

* [pyflink] Jincheng has started a discussion on what parts of the Python
API should we focus on next. A default feature list is given but looking
forward to hearing more feedback from the community. [7]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Documentation-tasks-for-release-1-10-td36031.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-9-2-td36087.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Integrate-Flink-Docker-image-publication-into-Flink-release-process-td36139.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-JSON-functions-in-Flink-SQL-td32674.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-feedback-after-trying-out-the-new-FLIP-49-memory-configurations-td36129.html
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952.html
[7]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-td36119.html

Notable Bugs
==

[FLINK-15262] [1.10.0] kafka connector doesn't read from beginning
immediately when 'connector.startup-mode' = 'earliest-offset'. [8]
[FLINK-15300] [1.10.0] Shuffle memory fraction sanity check does not
account for its min/max limit. [9]
[FLINK-15304] [1.11.0] Remove unexpected Hadoop dependency from Flink's
Mesos integration. [10]
[FLINK-15313] [1.10.0] Can not insert decimal with precision into sink
using TypeInformation. [11]
[FLINK-15320] [1.10.0] JobManager crashes in the standalone model when
cancelling job which subtask' status is scheduled. [12]

[8] https://issues.apache.org/jira/browse/FLINK-15262
[9] https://issues.apache.org/jira/browse/FLINK-15300
[10] https://issues.apache.org/jira/browse/FLINK-15304
[11] https://issues.apache.org/jira/browse/FLINK-15313
[12] https://issues.apache.org/jira/browse/FLINK-15320

Events, Blog Posts, Misc
===

* Philip Wilcox has published a blog about how they use Flink to detect
offline scooters in Bird. The blog mainly shares some experience of how to
solve a set of tricky problems involving Kafka, event time, watermarks, and
ordering. [13]

* In this blog post, Preetdeep Kumar introduces use-cases and best
practices for utilizing Apache Flink for processing streaming data. [14].

[13]
https://www.ververica.com/blog/replayable-process-functions-time-ordering-and-timers
[14] https://dzone.com/articles/streaming-etl-with-apache-flink

Cheers,
Hequn


Re: [ANNOUNCE] Weekly Community Update 2019/50

2019-12-17 Thread Hequn Cheng
Cool. I will do it in the next three weeks.
Thanks a lot for your continued great work!

Best, Hequn

On Tue, Dec 17, 2019 at 6:16 PM Konstantin Knauf 
wrote:

> Hi Hequn,
>
> thanks, and thanks for the offer. Of course, you can cover the holiday
> break, i.e. the next three weeks. Looking forward to your updates!
>
> Cheers,
>
> Konstantin
>
> On Mon, Dec 16, 2019 at 5:53 AM Hequn Cheng  wrote:
>
>> Hi Konstantin,
>>
>> Happy holidays and thanks a lot for your great job on the updates
>> continuously.
>> With the updates, it is easier for us to catch up with what's going on in
>> the community, which I think is quite helpful.
>>
>> I'm wondering if I can do some help and cover this during your vocation.
>> :)
>>
>> Best,
>> Hequn
>>
>> On Sun, Dec 15, 2019 at 11:36 PM Konstantin Knauf <
>> konstan...@ververica.com> wrote:
>>
>>> Dear community,
>>>
>>> happy to share this week's brief community digest with updates on Flink
>>> 1.8.3 and Flink 1.10, a discussion on how to facilitate easier Flink/Hive
>>> setups, a couple of blog posts and a bit more.
>>>
>>> *Personal Note:* Thank you for reading these updates since I started
>>> them early this year. I will take a three week Christmas break and will be
>>> back with a Holiday season community update on the 12th of January.
>>>
>>> Flink Development
>>> ==
>>>
>>> * [releases] Apache Flink 1.8.3 was released on Wednesday. [1,2]
>>>
>>> * [releases] The feature freeze for Apache Flink took place on Monday.
>>> The community is now working on testing, bug fixes and improving the
>>> documentation in order to create a first release candidate soon. [3]
>>>
>>> * [development process] Seth has revived the discussion on a past PR by
>>> Marta, which added a documentation style guide to the contributor guide.
>>> Please check it [4] out, if you are contributing documentation to Apache
>>> Flink. [5]
>>>
>>> * [security] Following a recent report to the Flink PMC of "exploiting"
>>> the Flink Web UI for remote code execution, Robert has started a discussion
>>> on how to improve the tooling/documentation to make users aware of this
>>> possibility and recommend securing this interface in production setups. [6]
>>>
>>> * [sql] Bowen has started a discussion on how to simplify the Flink-Hive
>>> setup for new users as currently users need to add some additional
>>> dependencies to the classpath manually. The discussion seems to conclude
>>> towards providing a single additional hive-uber jar, which contains all the
>>> required dependencies. [7]
>>>
>>> [1] https://flink.apache.org/news/2019/12/11/release-1.8.3.html
>>> [2]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-8-3-released-tp35868.html
>>> [3]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Feature-freeze-for-Apache-Flink-1-10-0-release-tp35139.html
>>> [4] https://github.com/apache/flink-web/pull/240
>>> [5]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Docs-Style-Guide-Review-tp35758.html
>>> [6]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improve-documentation-tooling-around-security-of-Flink-tp35898.html
>>> [7]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-have-separate-Flink-distributions-with-built-in-Hive-dependencies-tp35918.html
>>>
>>> Notable Bugs
>>> ==
>>>
>>> [FLINK-15152] [1.9.1] When a "stop" action on a job fails, because not
>>> all tasks are in "RUNNING" state the job is not checkpointing afterwards.
>>> [8]
>>>
>>> [8] https://issues.apache.org/jira/browse/FLINK-15152
>>>
>>> Events, Blog Posts, Misc
>>> ===
>>>
>>> * Zhu Zhu is now an Apache Flink Comitter. Congratulations! [9]
>>>
>>> * Gerred Dillon has published a blog post on the Apache Flink blog on
>>> how to run Flink on Kubernetes with a KUDO Flink operator. [10]
>>>
>>> * In this blog post Apache Flink PMC Sun Jincheng outlines the reasons
>>> and motivation for his and his colleague's work to provide a world-class
>>> Python support for Apache Flink's Table API. [11]
>>>
>>> * Upcoming Meetups
>>> * On De

Re: [ANNOUNCE] Weekly Community Update 2019/50

2019-12-15 Thread Hequn Cheng
Hi Konstantin,

Happy holidays and thanks a lot for your great job on the updates
continuously.
With the updates, it is easier for us to catch up with what's going on in
the community, which I think is quite helpful.

I'm wondering if I can do some help and cover this during your vocation. :)

Best,
Hequn

On Sun, Dec 15, 2019 at 11:36 PM Konstantin Knauf 
wrote:

> Dear community,
>
> happy to share this week's brief community digest with updates on Flink
> 1.8.3 and Flink 1.10, a discussion on how to facilitate easier Flink/Hive
> setups, a couple of blog posts and a bit more.
>
> *Personal Note:* Thank you for reading these updates since I started them
> early this year. I will take a three week Christmas break and will be back
> with a Holiday season community update on the 12th of January.
>
> Flink Development
> ==
>
> * [releases] Apache Flink 1.8.3 was released on Wednesday. [1,2]
>
> * [releases] The feature freeze for Apache Flink took place on Monday. The
> community is now working on testing, bug fixes and improving the
> documentation in order to create a first release candidate soon. [3]
>
> * [development process] Seth has revived the discussion on a past PR by
> Marta, which added a documentation style guide to the contributor guide.
> Please check it [4] out, if you are contributing documentation to Apache
> Flink. [5]
>
> * [security] Following a recent report to the Flink PMC of "exploiting"
> the Flink Web UI for remote code execution, Robert has started a discussion
> on how to improve the tooling/documentation to make users aware of this
> possibility and recommend securing this interface in production setups. [6]
>
> * [sql] Bowen has started a discussion on how to simplify the Flink-Hive
> setup for new users as currently users need to add some additional
> dependencies to the classpath manually. The discussion seems to conclude
> towards providing a single additional hive-uber jar, which contains all the
> required dependencies. [7]
>
> [1] https://flink.apache.org/news/2019/12/11/release-1.8.3.html
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-8-3-released-tp35868.html
> [3]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Feature-freeze-for-Apache-Flink-1-10-0-release-tp35139.html
> [4] https://github.com/apache/flink-web/pull/240
> [5]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Docs-Style-Guide-Review-tp35758.html
> [6]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improve-documentation-tooling-around-security-of-Flink-tp35898.html
> [7]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-have-separate-Flink-distributions-with-built-in-Hive-dependencies-tp35918.html
>
> Notable Bugs
> ==
>
> [FLINK-15152] [1.9.1] When a "stop" action on a job fails, because not all
> tasks are in "RUNNING" state the job is not checkpointing afterwards. [8]
>
> [8] https://issues.apache.org/jira/browse/FLINK-15152
>
> Events, Blog Posts, Misc
> ===
>
> * Zhu Zhu is now an Apache Flink Comitter. Congratulations! [9]
>
> * Gerred Dillon has published a blog post on the Apache Flink blog on how
> to run Flink on Kubernetes with a KUDO Flink operator. [10]
>
> * In this blog post Apache Flink PMC Sun Jincheng outlines the reasons and
> motivation for his and his colleague's work to provide a world-class Python
> support for Apache Flink's Table API. [11]
>
> * Upcoming Meetups
> * On December 17th there will be the second Apache Flink meetup in
> Seoul. [12] *Dongwon* has shared a detailed agenda in last weeks
> community update. [13]
> * On December 18th Alexander Fedulov will talk about Stateful Stream
> Processing with Apache Flink at the Java Professionals Meetup in Minsk. [14]
>
> [9]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Zhu-Zhu-becomes-a-Flink-committer-tp35944.html
> [10] https://flink.apache.org/news/2019/12/09/flink-kubernetes-kudo.html
> [11]
> https://developpaper.com/why-will-apache-flink-1-9-0-support-the-python-api/
> [12] https://www.meetup.com/Seoul-Apache-Flink-Meetup/events/266824815/
> [13]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Weekly-Community-Update-2019-48-td35423.html
> [14] https://www.meetup.com/Apache-Flink-Meetup-Minsk/events/267134296/
>
> Cheers,
>
> Konstantin (@snntrable)
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
>
> Follow us @VervericaData Ververica 
>
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>


[ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread Hequn Cheng
Hi,

The Apache Flink community is very happy to announce the release of Apache
Flink 1.8.3, which is the third bugfix release for the Apache Flink 1.8
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2019/12/11/release-1.8.3.html

The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12346112

We would like to thank all contributors of the Apache Flink community who
made this release possible!
Great thanks to @Jincheng as a mentor during this release.

Regards,
Hequn


Re: [ANNOUNCE] Apache Flink 1.9.1 released

2019-10-19 Thread Hequn Cheng
Thanks a lot to Jark, Jincheng, and everyone that make this release
possible.

Best, Hequn

On Sat, Oct 19, 2019 at 10:29 PM Zili Chen  wrote:

> Thanks a lot for being release manager Jark. Great work!
>
> Best,
> tison.
>
>
> Till Rohrmann  于2019年10月19日周六 下午10:15写道:
>
>> Thanks a lot for being our release manager Jark and thanks to everyone
>> who has helped to make this release possible.
>>
>> Cheers,
>> Till
>>
>> On Sat, Oct 19, 2019 at 3:26 PM Jark Wu  wrote:
>>
>>>  Hi,
>>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.9.1, which is the first bugfix release for the Apache Flink
>>> 1.9 series.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements for this bugfix release:
>>> https://flink.apache.org/news/2019/10/18/release-1.9.1.html
>>>
>>> The full release notes are available in Jira:
>>> https://issues.apache.org/jira/projects/FLINK/versions/12346003
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who helped to verify this release and made this release possible!
>>> Great thanks to @Jincheng for helping finalize this release.
>>>
>>> Regards,
>>> Jark Wu
>>>
>>


Re: [PROPOSAL] Contribute Stateful Functions to Apache Flink

2019-10-12 Thread Hequn Cheng
Hi Stephan,

Big +1 for adding this to Apache Flink!

As for the problem of whether this should be added to the Flink main
repository, from my side, I prefer to put it in the main repository. Not
only Stateful Functions shares very close relations with the current Flink,
but also other libs or modules in Flink can make use of it the other way
round in the future. At that time the Flink API stack would also be changed
a bit and this would be cool.

Best, Hequn

On Sat, Oct 12, 2019 at 9:16 PM Biao Liu  wrote:

> Hi Stehpan,
>
> +1 for having Stateful Functions in Flink.
>
> Before discussing which repository it should belong, I was wondering if we
> have reached an agreement of "splitting flink repository" as Piotr
> mentioned or not. It seems that it's just no more further discussion.
> It's OK for me to add it to core repository. After all almost everything
> is in core repository now. But if we decide to split the core repository
> someday, I tend to create a separate repository for Stateful Functions. It
> might be good time to take the first step of splitting.
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Sat, 12 Oct 2019 at 19:31, Yu Li  wrote:
>
>> Hi Stephan,
>>
>> Big +1 for adding stateful functions to Flink. I believe a lot of user
>> would be interested to try this out and I could imagine how this could
>> contribute to reduce the TCO for business requiring both streaming
>> processing and stateful functions.
>>
>> And my 2 cents is to put it into flink core repository since I could see
>> a tight connection between this library and flink state.
>>
>> Best Regards,
>> Yu
>>
>>
>> On Sat, 12 Oct 2019 at 17:31, jincheng sun 
>> wrote:
>>
>>> Hi Stephan,
>>>
>>> bit +1 for adding this great features to Apache Flink.
>>>
>>> Regarding where we should place it, put it into Flink core repository or
>>> create a separate repository? I prefer put it into main repository and
>>> looking forward the more detail discussion for this decision.
>>>
>>> Best,
>>> Jincheng
>>>
>>>
>>> Jingsong Li  于2019年10月12日周六 上午11:32写道:
>>>
 Hi Stephan,

 big +1 for this contribution. It provides another user interface that
 is easy to use and popular at this time. these functions, It's hard for
 users to write in SQL/TableApi, while using DataStream is too complex.
 (We've done some stateFun kind jobs using DataStream before). With
 statefun, it is very easy.

 I think it's also a good opportunity to exercise Flink's core
 capabilities. I looked at stateful-functions-flink briefly, it is very
 interesting. I think there are many other things Flink can improve. So I
 think it's a better thing to put it into Flink, and the improvement for it
 will be more natural in the future.

 Best,
 Jingsong Lee

 On Fri, Oct 11, 2019 at 7:33 PM Dawid Wysakowicz <
 dwysakow...@apache.org> wrote:

> Hi Stephan,
>
> I think this is a nice library, but what I like more about it is that
> it suggests exploring different use-cases. I think it definitely makes
> sense for the Flink community to explore more lightweight applications 
> that
> reuses resources. Therefore I definitely think it is a good idea for Flink
> community to accept this contribution and help maintaining it.
>
> Personally I'd prefer to have it in a separate repository. There were
> a few discussions before where different people were suggesting to extract
> connectors and other libraries to separate repositories. Moreover I think
> it could serve as an example for the Flink ecosystem website[1]. This 
> could
> be the first project in there and give a good impression that the 
> community
> sees potential in the ecosystem website.
>
> Lastly, I'm wondering if this should go through PMC vote according to
> our bylaws[2]. In the end the suggestion is to adopt an existing code base
> as is. It also proposes a new programs concept that could result in a 
> shift
> of priorities for the community in a long run.
>
> Best,
>
> Dawid
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html
>
> [2] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws
> On 11/10/2019 13:12, Till Rohrmann wrote:
>
> Hi Stephan,
>
> +1 for adding stateful functions to Flink. I believe the new set of
> applications this feature will unlock will be super interesting for new 
> and
> existing Flink users alike.
>
> One reason for not including it in the main repository would to not
> being bound to Flink's release cadence. This would allow to release faster
> and more often. However, I believe that having it eventually in Flink's
> main repository would be beneficial in the long run.
>
> Cheers,
> Till
>
> On Fri, Oct 11, 2019 at 12:56 PM Trevor Grant <

Re: [External] Re: From Kafka Stream to Flink

2019-09-19 Thread Hequn Cheng
Hi,

Fabian is totally right. Big thanks to the detailed answers and nice
examples above.

As for the PR, very sorry about the delay. It is mainly because of the
merge of blink and my work switching to Flink Python recently.
However, I think the later version of blink would cover this feature
natively with further merges.

Before that, I think we can use the solution Fabian provided above.

There are some examples here[1][2] which may be helpful to you
@Casado @Maatary.
In [1], the test case quite matches your scenario(perform join after
groupby+last_value). It also provides the udaf what you want and shows how
to register it.
In [2], the test shows how to use the built-in last_value in SQL. Note that
the built-in last_value UDAF is only supported in blink-planner from
flink-1.9.0. If you are using the flink-planner(or version before that),
you can register the last_value UDAF with the TableEnvironment like it is
showed in [1].

Feel free to ask if there are other problems.

Best, Hequn
[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.scala#L207
[2]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala#L228

On Thu, Sep 19, 2019 at 9:40 PM Casado Tejedor, Rubén <
ruben.casado.teje...@accenture.com> wrote:

> Thanks Fabian. @Hequn Cheng  Could you share the
> status? Thanks for your amazing work!
>
>
>
> *De: *Fabian Hueske 
> *Fecha: *viernes, 16 de agosto de 2019, 9:30
> *Para: *"Casado Tejedor, Rubén" 
> *CC: *Maatary Okouya , miki haiat <
> miko5...@gmail.com>, user , Hequn Cheng <
> chenghe...@gmail.com>
> *Asunto: *Re: [External] Re: From Kafka Stream to Flink
>
>
>
> Hi Ruben,
>
>
>
> Work on this feature has already started [1], but stalled a bit (probably
> due to the effort of merging the new Blink query processor).
>
> Hequn (in CC) is the guy working on upsert table ingestion, maybe he can
> share what the status of this feature is.
>
>
>
> Best, Fabian
>
>
>
> [1] https://github.com/apache/flink/pull/6787
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_pull_6787&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=rosq06snxAUogg77Y2SAZnaQef16zpmhGPcoGNUd4vg&s=0Mc6IZBBxqaJ6S_possk4V4ZTpdNphlZ3NoNPeL6NGA&e=>
>
>
>
> Am Di., 13. Aug. 2019 um 11:05 Uhr schrieb Casado Tejedor, Rubén <
> ruben.casado.teje...@accenture.com>:
>
> Hi
>
>
>
> Do you have an expected version of Flink to include the capability to
> ingest an upsert stream as a dynamic table? We have such need in our
> current project. What we have done is to emulate such behavior working at
> low level with states (e.g. update existing value if key exists, create a
> new value if key does not exist). But we cannot use SQL that would help as
> to do it faster.
>
>
>
> Our use case is many small flink jobs that have to something like:
>
>
>
> SELECT *some fields*
>
> FROM *t1* INNER JOIN *t1 on t1.id
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__t1.id&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=rosq06snxAUogg77Y2SAZnaQef16zpmhGPcoGNUd4vg&s=5ReK8KBJ2AMxI8faigLTfxwAxvlvXbtPG48TzkLZbXc&e=>
> = t2.id
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__t2.id&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=rosq06snxAUogg77Y2SAZnaQef16zpmhGPcoGNUd4vg&s=BnXyZjU0mHMrZ-gu7wRz5GUirxitCuQcCFjd8nbVNyw&e=>
> (maybe join +3 tables)*
>
> WHERE *some conditions on fields*;
>
>
>
> We need the result of that queries taking into account only the last
> values of each row. The result is inserted/updated in a in-memory K-V
> database for fast access.
>
>
>
> Thanks in advance!
>
>
>
> Best
>
>
>
> *De: *Fabian Hueske 
> *Fecha: *miércoles, 7 de agosto de 2019, 11:08
> *Para: *Maatary Okouya 
> *CC: *miki haiat , user 
> *Asunto: *[External] Re: From Kafka Stream to Flink
>
>
>
> This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with
> links and attachments.
> --
>
>
>
> Hi,
>
>
>
> LAST_VAL is not a built-in function, so you'd need to implement it as a
> user-defined aggregate function (UDAGG) and register it.
>
>
>
> The problem with joining an append only table with an updatin

Re: [ANNOUNCE] Zili Chen becomes a Flink committer

2019-09-11 Thread Hequn Cheng
Congratulations!

Best, Hequn

On Thu, Sep 12, 2019 at 9:24 AM Jark Wu  wrote:

> Congratulations Zili!
>
> Best,
> Jark
>
> On Wed, 11 Sep 2019 at 23:06,  wrote:
>
> > Congratulations, Zili.
> >
> >
> >
> > Best,
> >
> > Xingcan
> >
> >
> >
> > *From:* SHI Xiaogang 
> > *Sent:* Wednesday, September 11, 2019 7:43 AM
> > *To:* Guowei Ma 
> > *Cc:* Fabian Hueske ; Biao Liu ;
> > Oytun Tez ; bupt_ljy ; dev <
> > d...@flink.apache.org>; user ; Till Rohrmann <
> > trohrm...@apache.org>
> > *Subject:* Re: [ANNOUNCE] Zili Chen becomes a Flink committer
> >
> >
> >
> > Congratulations!
> >
> >
> >
> > Regards,
> >
> > Xiaogang
> >
> >
> >
> > Guowei Ma  于2019年9月11日周三 下午7:07写道:
> >
> > Congratulations Zili !
> >
> >
> > Best,
> >
> > Guowei
> >
> >
> >
> >
> >
> > Fabian Hueske  于2019年9月11日周三 下午7:02写道:
> >
> > Congrats Zili Chen :-)
> >
> >
> >
> > Cheers, Fabian
> >
> >
> >
> > Am Mi., 11. Sept. 2019 um 12:48 Uhr schrieb Biao Liu  >:
> >
> > Congrats Zili!
> >
> >
> >
> > Thanks,
> >
> > Biao /'bɪ.aʊ/
> >
> >
> >
> >
> >
> >
> >
> > On Wed, 11 Sep 2019 at 18:43, Oytun Tez  wrote:
> >
> > Congratulations!
> >
> >
> >
> > ---
> >
> > Oytun Tez
> >
> >
> >
> > *M O T A W O R D*
> >
> > *The World's Fastest Human Translation Platform.*
> >
> > oy...@motaword.com — www.motaword.com
> >
> >
> >
> >
> >
> > On Wed, Sep 11, 2019 at 6:36 AM bupt_ljy  wrote:
> >
> > Congratulations!
> >
> >
> >
> > Best,
> >
> > Jiayi Liao
> >
> >
> >
> >  Original Message
> >
> > *Sender:* Till Rohrmann
> >
> > *Recipient:* dev; user
> >
> > *Date:* Wednesday, Sep 11, 2019 17:22
> >
> > *Subject:* [ANNOUNCE] Zili Chen becomes a Flink committer
> >
> >
> >
> > Hi everyone,
> >
> >
> >
> > I'm very happy to announce that Zili Chen (some of you might also know
> > him as Tison Kun) accepted the offer of the Flink PMC to become a
> committer
> > of the Flink project.
> >
> >
> >
> > Zili Chen has been an active community member for almost 16 months now.
> > He helped pushing the Flip-6 effort over the finish line, ported a lot of
> > legacy code tests, removed a good part of the legacy code, contributed
> > numerous fixes, is involved in the Flink's client API refactoring, drives
> > the refactoring of Flink's HighAvailabilityServices and much more. Zili
> > Chen also helped the community by PR reviews, reporting Flink issues,
> > answering user mails and being very active on the dev mailing list.
> >
> >
> >
> > Congratulations Zili Chen!
> >
> >
> >
> > Best, Till
> >
> > (on behalf of the Flink PMC)
> >
> >
>


Re: [ANNOUNCE] Andrey Zagrebin becomes a Flink committer

2019-08-15 Thread Hequn Cheng
Congratulations Andrey!

On Thu, Aug 15, 2019 at 3:30 PM Fabian Hueske  wrote:

> Congrats Andrey!
>
> Am Do., 15. Aug. 2019 um 07:58 Uhr schrieb Gary Yao :
>
> > Congratulations Andrey, well deserved!
> >
> > Best,
> > Gary
> >
> > On Thu, Aug 15, 2019 at 7:50 AM Bowen Li  wrote:
> >
> > > Congratulations Andrey!
> > >
> > > On Wed, Aug 14, 2019 at 10:18 PM Rong Rong 
> wrote:
> > >
> > >> Congratulations Andrey!
> > >>
> > >> On Wed, Aug 14, 2019 at 10:14 PM chaojianok 
> wrote:
> > >>
> > >> > Congratulations Andrey!
> > >> > At 2019-08-14 21:26:37, "Till Rohrmann" 
> wrote:
> > >> > >Hi everyone,
> > >> > >
> > >> > >I'm very happy to announce that Andrey Zagrebin accepted the offer
> of
> > >> the
> > >> > >Flink PMC to become a committer of the Flink project.
> > >> > >
> > >> > >Andrey has been an active community member for more than 15 months.
> > He
> > >> has
> > >> > >helped shaping numerous features such as State TTL, FRocksDB
> release,
> > >> > >Shuffle service abstraction, FLIP-1, result partition management
> and
> > >> > >various fixes/improvements. He's also frequently helping out on the
> > >> > >user@f.a.o mailing lists.
> > >> > >
> > >> > >Congratulations Andrey!
> > >> > >
> > >> > >Best, Till
> > >> > >(on behalf of the Flink PMC)
> > >> >
> > >>
> > >
> >
>


Re: Implementing a low level join

2019-08-14 Thread Hequn Cheng
Hi Felipe,

If I understand correctly, you also have to decide whether to broadcast the
datastream from the right side before performing the function?

One option is you can add a Util method to join dynamically, e.g.,
Util.joinDynamically(ds1, ds2). In the util method, you can implement your
own strategy logic and decide whether to broadcast or use CoProcessFunction.

Best, Hequn

On Wed, Aug 14, 2019 at 3:07 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi Hequn,
>
> I am implementing the broadcast and the regular join. As you said I need
> different functions. My question is more about if I can have an operator
> which decides beteween broadcast and regular join dynamically. I suppose I
> will have to extend the generic TwoInputStreamOperator in Flink. Do you
> have any suggestion?
>
> Thanks
>
> On Wed, 14 Aug 2019, 03:59 Hequn Cheng,  wrote:
>
>> Hi Felipe,
>>
>> > I want to implement a join operator which can use different strategies
>> for joining tuples.
>> Not all kinds of join strategies can be applied to streaming jobs. Take
>> sort-merge join as an example, it's impossible to sort an unbounded data.
>> However, you can perform a window join and use the sort-merge strategy to
>> join the data within a window. Even though, I'm not sure it's worth to do
>> it considering the performance.
>>
>> > Therefore, I am not sure if I will need to implement my own operator to
>> do this or if it is still possible to do with CoProcessFunction.
>> You can't implement broadcast join with CoProcessFunction. But you can
>> implement it with BroadcastProcessFunction or
>> KeyedBroadcastProcessFunction, more details here[1].
>>
>> Furthermore, you can take a look at the implementation of both window
>> join and non-window join in Table API & SQL[2]. The code can be found
>> here[3].
>> Hope this helps.
>>
>> Best, Hequn
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#joins
>> [3]
>> https://github.com/apache/flink/tree/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join
>>
>>
>> On Tue, Aug 13, 2019 at 11:30 PM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I want to implement a join operator which can use different strategies
>>> for joining tuples. I saw that with CoProcessFunction I am able to
>>> implement low-level joins [1]. However, I do know how to decide between
>>> different algorithms to join my tuples.
>>>
>>> On the other hand, to do a broadcast join I will need to use the
>>> broadcast operator [2] which yields a BroadcastStream. Therefore, I am not
>>> sure if I will need to implement my own operator to do this or if it is
>>> still possible to do with CoProcessFunction.
>>>
>>> Does anyone have some clues for this matter?
>>> Thanks
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html#low-level-joins
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>


Re: Implementing a low level join

2019-08-13 Thread Hequn Cheng
Hi Felipe,

> I want to implement a join operator which can use different strategies
for joining tuples.
Not all kinds of join strategies can be applied to streaming jobs. Take
sort-merge join as an example, it's impossible to sort an unbounded data.
However, you can perform a window join and use the sort-merge strategy to
join the data within a window. Even though, I'm not sure it's worth to do
it considering the performance.

> Therefore, I am not sure if I will need to implement my own operator to
do this or if it is still possible to do with CoProcessFunction.
You can't implement broadcast join with CoProcessFunction. But you can
implement it with BroadcastProcessFunction or
KeyedBroadcastProcessFunction, more details here[1].

Furthermore, you can take a look at the implementation of both window join
and non-window join in Table API & SQL[2]. The code can be found here[3].
Hope this helps.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#joins
[3]
https://github.com/apache/flink/tree/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join


On Tue, Aug 13, 2019 at 11:30 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi all,
>
> I want to implement a join operator which can use different strategies for
> joining tuples. I saw that with CoProcessFunction I am able to implement
> low-level joins [1]. However, I do know how to decide between different
> algorithms to join my tuples.
>
> On the other hand, to do a broadcast join I will need to use the broadcast
> operator [2] which yields a BroadcastStream. Therefore, I am not sure if I
> will need to implement my own operator to do this or if it is still
> possible to do with CoProcessFunction.
>
> Does anyone have some clues for this matter?
> Thanks
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html#low-level-joins
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>


Re: Is it possible to decide the order of where conditions in Flink SQL

2019-07-28 Thread Hequn Cheng
Hi Tony,

There is no order guarantee for filter conditions. The conditions would be
pushed down or merged during query optimization.

However, you can use the case when[1] to achieve what you want. The code
looks like:
CASE
WHEN !user.is_robot THEN true
WHEN UDF_NEED_TO_QUERY_DB(user) THEN true
ELSE false
END

For case when, it evaluates the conditions in order.

Note: The UDF_NEED_TO_QUERY_DB must be a nonDeterministic udf, or the case
when would also be optimized and changed to an OR by the query optimizer.
You can override the isDeterministic method of ScalarFunction to make it
nonDeterministic, i.e., override def isDeterministic: Boolean = false

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions.html#conditional-functions

On Sat, Jul 27, 2019 at 4:35 PM Tony Wei  wrote:

> Hi,
>
> Thanks for your reply. I have tried both CTE and sql subquery, but it
> seems that sql plan
> optimizer will do filter pushdown. Therefore, where conditions will end up
> being together in
> physical plan.
>
> However, the visualization of physical plans on Flink UI were different
> for these three SQL
> query on their operations' name.
>
> For the original SQL, it showed:
>
>> where: (AND(UDF_NEED_TO_QUERY_DB(user), NOT(user.is_robot))), select:
>> (...)
>
>
> For the CTE and subquery , it showed:
>
>> where: (AND(NOT(user.is_robot), UDF_NEED_TO_QUERY_DB(user))), select:
>> (...)
>
>
> Does this name for each operator of physical plan have any meaning to
> represent the
> execution order of `where` conditions?
>
> Best,
> Tony Wei
>
> sri hari kali charan Tummala  於 2019年7月27日 週六
> 上午3:02寫道:
>
>> try cte common table expressions if it supports or sql subquery.
>>
>> On Fri, Jul 26, 2019 at 1:00 PM Fanbin Bu  wrote:
>>
>>> how about move query db filter to the outer select.
>>>
>>> On Fri, Jul 26, 2019 at 9:31 AM Tony Wei  wrote:
>>>
 Hi,

 If I have multiple where conditions in my SQL, is it possible to
 specify its order, so that the query
 can be executed more efficiently?

 For example, if I have the following SQL, it used a heavy UDF that
 needs to access database.
 However, if I can specify the order of conditions is executing
 `!user.is_robot` first then executing
 UDF, it will reduce the number of database access. Those records with
 `true` in `user.is_robot` will
 be dropped earlier and don't need to access database.

 select *

 from users

 where !user.is_robot and UDF_NEED_TO_QUERY_DB(user)


 Thanks,
 Tony Wei

>>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>


Re: GroupBy result delay

2019-07-24 Thread Hequn Cheng
Hi Fanbin,

> 2. I have parallelism = 32 and only one task has the record. Can you
please elaborate more on why this would affect the watermark advancement?
Each parallel subtask of a source function usually generates its watermarks
independently, say wk1, wk2... wkn. The downstream window operator’s
current event time is the minimum of its input streams’ event times, so
here wk_window = min(wk1, wk2... wkn).
If some of the tasks don't have data, the wk_window would not be advanced.
More details here[1].

In your case, you can set the parallelism of the source to 1 to solve your
problem, and also, keep the parallelism of assignTimestampsAndWatermarks
same with source.

> 4. data span time > window time. I don't quite understand why this
matters.
For example, if you have a tumbling window with a size of 1 day, but the
data all comes are within 1 hour of this day. In this case, the event time
would not reach the end of the window, i.e., the window will not fire.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams

On Thu, Jul 25, 2019 at 12:17 AM Fanbin Bu  wrote:

> Hequn,
>
> Thanks for the help. It is indeed a watermark problem. From Flink UI, I
> can see the low watermark value for each operator. And the groupBy operator
> has lagged value of watermark. I checked the link from SO and confirmed
> that:
> 1. I do see record coming in for this operator
> 2. I have parallelism = 32 and only one task has the record. Can you
> please elaborate more on why this would affect the watermark advancement?
> 3. Event create time is in ms
> 4. data span time > window time. I don't quite understand why this
> matters.
>
> Thanks,
> Fanbin
>
> On Tue, Jul 23, 2019 at 7:17 PM Hequn Cheng  wrote:
>
>> Hi Fanbin,
>>
>> Fabian is right, it should be a watermark problem. Probably, some tasks
>> of the source don't have enough data to advance the watermark. Furthermore,
>> you could also monitor event time through Flink web interface.
>> I have answered a similar question on stackoverflow, see more details
>> here[1].
>>
>> Best, Hequn
>>
>> [1]
>> https://stackoverflow.com/questions/51691269/event-time-window-in-flink-does-not-trigger
>>
>> On Wed, Jul 24, 2019 at 4:38 AM Fanbin Bu  wrote:
>>
>>> If I use proctime, the groupBy happens without any delay.
>>>
>>> On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu 
>>> wrote:
>>>
>>>> not sure whether this is related:
>>>>
>>>> public SingleOutputStreamOperator assignTimestampsAndWatermarks(
>>>>   AssignerWithPeriodicWatermarks timestampAndWatermarkAssigner) {
>>>>
>>>>// match parallelism to input, otherwise dop=1 sources could lead to 
>>>> some strange
>>>>// behaviour: the watermark will creep along very slowly because the 
>>>> elements
>>>>// from the source go to each extraction operator round robin.
>>>>final int inputParallelism = getTransformation().getParallelism();
>>>>final AssignerWithPeriodicWatermarks cleanedAssigner = 
>>>> clean(timestampAndWatermarkAssigner);
>>>>
>>>>TimestampsAndPeriodicWatermarksOperator operator =
>>>>  new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
>>>>
>>>>return transform("Timestamps/Watermarks", 
>>>> getTransformation().getOutputType(), operator)
>>>>  .setParallelism(inputParallelism);
>>>> }
>>>>
>>>> parallelism is set to 32
>>>>
>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>
>>>> env.setParallelism(32)
>>>>
>>>> and the command for launching the job is
>>>>
>>>> flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu 
>>>> wrote:
>>>>
>>>>> Thanks Fabian for the prompt reply. I just started using Flink and
>>>>> this is a great community.
>>>>> The watermark setting is only accounting for 10 sec delay. Besides
>>>>> that, the local job on IntelliJ is running fine without issues.
>>>>>
>>>>> Here is the code:
>>>>>
>>>>> class EventTimestampExtractor(slack: Long = 0L) extends 
>>>>> AssignerWithPeriodicWatermarks[T] {
>>>>>
>>>>>   var currentMaxTimestamp: Lo

Re: GroupBy result delay

2019-07-23 Thread Hequn Cheng
Hi Fanbin,

Fabian is right, it should be a watermark problem. Probably, some tasks of
the source don't have enough data to advance the watermark. Furthermore,
you could also monitor event time through Flink web interface.
I have answered a similar question on stackoverflow, see more details
here[1].

Best, Hequn

[1]
https://stackoverflow.com/questions/51691269/event-time-window-in-flink-does-not-trigger

On Wed, Jul 24, 2019 at 4:38 AM Fanbin Bu  wrote:

> If I use proctime, the groupBy happens without any delay.
>
> On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu  wrote:
>
>> not sure whether this is related:
>>
>> public SingleOutputStreamOperator assignTimestampsAndWatermarks(
>>   AssignerWithPeriodicWatermarks timestampAndWatermarkAssigner) {
>>
>>// match parallelism to input, otherwise dop=1 sources could lead to some 
>> strange
>>// behaviour: the watermark will creep along very slowly because the 
>> elements
>>// from the source go to each extraction operator round robin.
>>final int inputParallelism = getTransformation().getParallelism();
>>final AssignerWithPeriodicWatermarks cleanedAssigner = 
>> clean(timestampAndWatermarkAssigner);
>>
>>TimestampsAndPeriodicWatermarksOperator operator =
>>  new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
>>
>>return transform("Timestamps/Watermarks", 
>> getTransformation().getOutputType(), operator)
>>  .setParallelism(inputParallelism);
>> }
>>
>> parallelism is set to 32
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> env.setParallelism(32)
>>
>> and the command for launching the job is
>>
>> flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS
>>
>>
>>
>>
>> On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu  wrote:
>>
>>> Thanks Fabian for the prompt reply. I just started using Flink and this
>>> is a great community.
>>> The watermark setting is only accounting for 10 sec delay. Besides that,
>>> the local job on IntelliJ is running fine without issues.
>>>
>>> Here is the code:
>>>
>>> class EventTimestampExtractor(slack: Long = 0L) extends 
>>> AssignerWithPeriodicWatermarks[T] {
>>>
>>>   var currentMaxTimestamp: Long = _
>>>
>>>   override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
>>> val elemTs = e.created_at
>>> currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
>>> elemTs
>>>   }
>>>
>>>   override def getCurrentWatermark(): Watermark = {
>>>   new Watermark(currentMaxTimestamp)
>>>   }
>>> }
>>>
>>> events.assignTimestampsAndWatermarks(new EventTimestampExtractor(1))
>>>
>>> Are there any other things I should be aware of?
>>>
>>> Thanks again for you kind help!
>>>
>>> Fanbin
>>>
>>>
>>> On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske  wrote:
>>>
 Hi Fanbin,

 The delay is most likely caused by the watermark delay.
 A window is computed when the watermark passes the end of the window.
 If you configured the watermark to be 10 minutes before the current max
 timestamp (probably to account for out of order data), then the window will
 be computed with approx. 10 minute delay.

 Best, Fabian

 Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <
 fanbin...@coinbase.com>:

> Hi,
> I have a Flink sql streaming job defined by:
>
> SELECT
>   user_id
>   , hop_end(created_at, interval '30' second, interval '1' minute) as 
> bucket_ts
>   , count(name) as count
> FROM event
> WHERE name = 'signin'
> GROUP BY
>   user_id
>   , hop(created_at, interval '30' second, interval '1' minute)
>
>
> there is a noticeably delay of the groupBy operator. For example, I
> only see the record sent out 10 min later after the record received in. 
> see
> the attached pic.
>
> [image: image.png]
>
> I m expecting to see the group by result after one minute since the
> sliding window size is 1 min and the slide is 30 sec.
>
> There is no such issue if I run the job locally in IntelliJ. However,
> I ran into the above issue if I run the job on EMR (flink version = 1.7)
>
> Can anybody give a clue of what could be wrong?
> Thanks,
>
> Fanbin
>



Re: table toRetractStream missing last record and adding extra column (True)

2019-07-17 Thread Hequn Cheng
Hi Sri,

Question1:
You can use a map to filter the "true", i.e, ds.map(_._2).
Note, it's ok to remove the "true" flag for distinct as it does not
generate updates. For other query contains updates, such as a non-window
group by, we should not filter the flag or the result is not correct.

Question 2:
I can't reproduce this problem in my local environment. Maybe there is
something wrong with the source data?

Best, Hequn

On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> windows for question 1 or question 2 or both ?
>
> Thanks
> Sri
>
> On Tue, Jul 16, 2019 at 12:25 PM taher koitawala 
> wrote:
>
>> Looks like you need a window
>>
>> On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I am trying to write toRetractSream to CSV which is kind of working ok
>>> but I get extra values like True and then my output data values.
>>>
>>> Question1 :-
>>> I dont want true in my output data how to achieve this?
>>>
>>> Scree
>>>
>>> Question 2:-
>>> in the output file (CSV) I am missing data in the last line is the
>>> toRetractStram closing before writing to file?
>>>
>>> Screen Shot attached
>>>
>>> Code:-
>>>
>>> val data = kinesis.map(mapFunction)
>>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>>> val query = "SELECT distinct 
>>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>>>  FROM transactions where cc_num not in ('cc_num')"
>>> val table = tEnv.sqlQuery(query)
>>> tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
>>>   
>>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>>> FileSystem.WriteMode.OVERWRITE,"\n","|")
>>>
>>>
>>>
>>> --
>>> Thanks & Regards
>>> Sri Tummala
>>>
>>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>


Re: org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.

2019-07-17 Thread Hequn Cheng
Hi Sri,

For scala jobs, we should import the corresponding scala Environment and
DataStream.

e.g,
import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
import org.apache.flink.table.api.scala.StreamTableEnvironment
See example here[1].

Best,
Hequn

[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala


On Tue, Jul 16, 2019 at 11:03 PM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> is this a Bug in Flink Scala?
>
> Full code and Maven POM:-
>
> package com.aws.examples.kinesis.consumer.TransactionExample
>
> import java.lang
> import java.util.Properties
>
> import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, 
> SimpleStringSchema}
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
> import 
> org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, 
> ConsumerConfigConstants}
> import com.google.gson.{Gson, JsonObject}
> import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
> import java.sql.{DriverManager, Time}
>
> import com.aws.SchemaJavaClasses.Row1
> import org.apache.flink.types.Row
> import org.apache.flink.table.api.scala._
> import org.apache.flink.table.sinks.CsvTableSink
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.table.sinks.TableSink
> import org.apache.flink.core.fs.{FileSystem, Path}
>
> import scala.collection.JavaConversions._
> import org.apache.flink.table.sources.CsvTableSource
> import org.apache.flink.table.api.Table
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.streaming.api.datastream.DataStream
> import org.apache.flink.streaming.api.functions.sink.SinkFunction
> import 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
> import com.aws.customSinks.CsvCustomSink
> import org.apache.flink.api.java.tuple
> import org.apache.flink.api.java.typeutils.TupleTypeInfo
> import org.apache.flink.table.sinks.AppendStreamTableSink
> import org.apache.flink.table.sinks.RetractStreamTableSink
> import org.apache.flink.api.java.DataSet
>
>
>
> object KinesisConsumer extends RetractStreamTableSink[Row] {
>
>   override def configure(strings: Array[String], typeInformations: 
> Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ???
>
>   override def getFieldNames: Array[String] = ???
>
>   override def getFieldTypes: Array[TypeInformation[_]] = ???
>
>   override def emitDataStream(dataStream: 
> DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???
>
>   override def getOutputType(): TupleTypeInfo[tuple.Tuple2[lang.Boolean, 
> Row]] = super.getOutputType()
>
>   override def getRecordType: TypeInformation[Row] = ???
>
>
>   def main(args: Array[String]): Unit = {
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> //env.enableCheckpointing(10)
>
> val tEnv: org.apache.flink.table.api.java.StreamTableEnvironment = 
> TableEnvironment.getTableEnvironment(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
> credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
> "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new 
> FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), 
> consumerConfig))
>
> val mapFunction: MapFunction[String, Tuple10[String, String, String, 
> String, String, String, String, String, String, String]] =
>   new MapFunction[String, Tuple10[String, String, String, String, String, 
> String, String, String, String, String]]() {
>
> override def map(s: String): Tuple10[String, String, String, String, 
> String, String, String, String, String, String] = {
>
>   val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>   val csvData = data.getCc_num + "," +
> data.getFirst + "," +
> data.getLast + "," +
> data.getTrans_num + "," +
> data.getTrans_time + "," +
> data.getCategory + "," +
> data.getMerchant + "," +
> data.getAmt + "," +
> data.getMerch_lat + "," +
> data.get

Re: [flink 1.8.1]window closed unexpectedly and data drop

2019-07-13 Thread Hequn Cheng
Hi Ever,

The window only fires when the watermark passes the end of a window.

> now the fourth data came with timestamp:03:17:55,   at that time, a new
window should be open, and the previous window should closed
The previous window may not close if the watermark hasn't passed the end of
the window. More info about watermark here[1].
Furthermore, we can monitor event time by checking the watermarks in the
web dashboard[2].

Best,
Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
[2]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time


On Sat, Jul 13, 2019 at 3:37 PM Ever <439674...@qq.com> wrote:

> I have a streaming job based on Event time,  which has a 60 seconds window
> and 10 seconds sliding window.
> Data will come in batches every 10 second.
>
> Here's the code.
> ```
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)
>
> env.setParallelism(parallel)
>
> env.addSource(source)
>   .map(json => {
>   new InvokingInfoWrapper(xxx)
> })
>   .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5))
> {
> override def extractTimestamp(invoking: InvokingInfoWrapper): Long
> = {
>   invoking.timestamp
> }
>   })
>   .keyBy(invokingInfo => {
> s"${invokingInfo.caller}_${invokingInfo.callee}"
>   })
>   .timeWindow(Time.seconds(60), Time.seconds(10))
>   .reduce(innerReducer).map(invokingInfo => {
>  // ##2map =
>   //some mapping code
>   invokingInfo
>   })
>   .addSink(new
> WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink")
> ```
>
> And I Have noticed that something wrong:
> 1. the first data came with timestamp:03:15:48
> 2. the second data came with timestamp:03:15:59, and triggered the reduce
> operation(5 reduce operations,there should be 5 window)
> 3. the third one: 03:16:06,   and also triggered reduce opertaions.
> 4. now the fourth data came with timestamp:03:17:55,   at that time, a new
> window should be open, and the previous window should closed and the result
> should enter line "##2map" above. But it didn't.
> 5. the fifth data came with timestamp:03:18:01, and triggered the reduce
> operation with the fourth data.
>
> So it seems that the top three datas had drop silently.
>
> Somebody help on this?
>


Re: Table API and ProcessWindowFunction

2019-07-11 Thread Hequn Cheng
Hi Flavio,

I think the reason that we don't have interfaces like EventTimeObject and
ProcessingTimeObject is we don't want to define time attributes anywhere.
It is considered to define your time attributes in the source. If we add an
interface like EventTimeObject and ProcessingTimeObject in Flink, it may
bring some other problems like should we generate time attributes anywhere
once the object extends EventTimeObject and ProcessingTimeObject. The
object may exist in a source, aggregate or even a sink.

However, I think it's a good idea to add such logic in your own code. For
example, you can define a user-defined source which can just extract time
attributes from EventTimeObject and ProcessingTimeObject, similar to the
examples in the exercises.

Best, Hequn

On Thu, Jul 11, 2019 at 4:36 PM Flavio Pompermaier 
wrote:

> Only one proposal here: many times it happens that when working with
> streaming sources you need to define which field is the processing/row.
> Right now you could define the processing or event time field
> implementingthe DefinedProctimeAttribute or DefinedRowtimeAttribute at
> source. But this is only helpful if you use SQL API..with TableFunctions
> for example you don't have a way to get the proc/row field easily.
> Also in the Flink exercises [1] you use aPojo where you have to implement
> a method getEventTime() to retrieve the row time field.
>
> So, why not declaring 2 general interfaces like EventTimeObject and
> ProcessingTimeObject so I can declare my objects implementing those
> interfaces and I can get the fields I need easily?
>
> Best,
> Flavio
>
> [1]
> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java
>
> On Thu, Jul 11, 2019 at 10:01 AM Flavio Pompermaier 
> wrote:
>
>> Thanks Hequn, I'll give it a try!
>>
>> Best, Flavio
>>
>> On Thu, Jul 11, 2019 at 3:38 AM Hequn Cheng  wrote:
>>
>>> Hi,
>>>
>>> > Can you provide a pseudo-code example of how to implement this?
>>> Processing time
>>> If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each
>>> record, you get the timestamp from System.currentTimeMillis(), say t, and
>>> w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end =
>>> w_start + 1000.
>>>
>>> Event time
>>> If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each
>>> record, get the timestamp from the corresponding timestamp field, say t,
>>> and get w_start and w_end same as above.
>>>
>>> More examples can be found in TimeWindowTest[1].
>>>
>>> Best, Hequn
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
>>>
>>>
>>> On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier 
>>> wrote:
>>>
>>>> The problem with the LATERAL JOIN (via
>>>> a LookupableTableSource+TableFunction because I need to call that function
>>>> using the userId a a parameter)  is that I cannot know the window
>>>> start/end..to me it's not clear how to get that from
>>>> TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)...
>>>> Can you provide a pseudo-code example of how to implement this?
>>>>
>>>> On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng 
>>>> wrote:
>>>>
>>>>> Hi Flavio,
>>>>>
>>>>> Thanks for your information.
>>>>>
>>>>> From your description, it seems that you only use the window to get
>>>>> the start and end time. There are no aggregations happen. If this is the
>>>>> case, you can get the start and end time by yourself(the
>>>>> `TimeWindow.getWindowStartWithOffset()` shows how to get window start
>>>>> according to the timestamp). To be more specific, if you use processing
>>>>> time, you can get your timestamp with System.currentTimeMillis(), and then
>>>>> use it to get the window start and end
>>>>> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get
>>>>> the timestamp from the rowtime field.
>>>>>
>>>>> With the start and end time, you can then perform LATERAL JOIN to
>>>>> enrich the information. You can add a cache in your table function to 
>>>>> avoid
>>>>> frequent contacting with the REST endpoint.
>>>>>
>&

Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread Hequn Cheng
Congratulations Rong!

Best, Hequn

On Fri, Jul 12, 2019 at 12:19 PM Jeff Zhang  wrote:

> Congrats, Rong!
>
>
> vino yang  于2019年7月12日周五 上午10:08写道:
>
>> congratulations Rong Rong!
>>
>> Fabian Hueske  于2019年7月11日周四 下午10:25写道:
>>
>>> Hi everyone,
>>>
>>> I'm very happy to announce that Rong Rong accepted the offer of the
>>> Flink PMC to become a committer of the Flink project.
>>>
>>> Rong has been contributing to Flink for many years, mainly working on
>>> SQL and Yarn security features. He's also frequently helping out on the
>>> user@f.a.o mailing lists.
>>>
>>> Congratulations Rong!
>>>
>>> Best, Fabian
>>> (on behalf of the Flink PMC)
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Table API and ProcessWindowFunction

2019-07-10 Thread Hequn Cheng
Hi,

> Can you provide a pseudo-code example of how to implement this?
Processing time
If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each
record, you get the timestamp from System.currentTimeMillis(), say t, and
w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end =
w_start + 1000.

Event time
If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each record,
get the timestamp from the corresponding timestamp field, say t, and get
w_start and w_end same as above.

More examples can be found in TimeWindowTest[1].

Best, Hequn

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java


On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier 
wrote:

> The problem with the LATERAL JOIN (via
> a LookupableTableSource+TableFunction because I need to call that function
> using the userId a a parameter)  is that I cannot know the window
> start/end..to me it's not clear how to get that from
> TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)...
> Can you provide a pseudo-code example of how to implement this?
>
> On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng  wrote:
>
>> Hi Flavio,
>>
>> Thanks for your information.
>>
>> From your description, it seems that you only use the window to get the
>> start and end time. There are no aggregations happen. If this is the case,
>> you can get the start and end time by yourself(the
>> `TimeWindow.getWindowStartWithOffset()` shows how to get window start
>> according to the timestamp). To be more specific, if you use processing
>> time, you can get your timestamp with System.currentTimeMillis(), and then
>> use it to get the window start and end
>> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get
>> the timestamp from the rowtime field.
>>
>> With the start and end time, you can then perform LATERAL JOIN to enrich
>> the information. You can add a cache in your table function to avoid
>> frequent contacting with the REST endpoint.
>>
>> Best, Hequn
>>
>>
>> On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi Hequn, thanks for your answer.
>>> What I'm trying to do is to read a stream of events that basically
>>> contains a UserId field and, every X minutes (i.e. using a Time Window) and
>>> for each different UserId key, query 3 different REST services to enrich my
>>> POJOs*.
>>> For the moment what I do is to use a ProcessWindowFunction after the
>>> .keyBy().window() as shown in the  previous mail example to contact those 3
>>> services and enrich my object.
>>>
>>> However I don't like this solution because I'd like to use Flink to it's
>>> full potential so I'd like to enrich my object using LATERAL TABLEs or
>>> ASYNC IO..
>>> The main problem I'm facing right now is that  I can't find a way to
>>> pass the thumbing window start/end to the LATERAL JOIN table functions
>>> (because this is a parameter of the REST query).
>>> Moreover I don't know whether this use case is something that Table API
>>> aims to solve..
>>>
>>> * Of course this could kill the REST endpoint if the number of users is
>>> very big ..because of this I'd like to keep the external state of source
>>> tables as an internal Flink state and then do a JOIN on the UserId. The
>>> problem here is that I need to "materialize" them using Debezium (or
>>> similar) via Kafka and dynamic tables..is there any example of keeping
>>> multiple tables synched with Flink state through Debezium (without the need
>>> of rewriting all the logic for managing UPDATE/INSERT/DELETE)?
>>>
>>> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng  wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> Nice to hear your ideas on Table API!
>>>>
>>>> Could you be more specific about your requirements? A detailed scenario
>>>> would be quite helpful. For example, do you want to emit multi records
>>>> through the collector or do you want to use the timer?
>>>>
>>>> BTW, Table API introduces flatAggregate recently(both non-window
>>>> flatAggregate and window flatAggregate) and will be included in the near
>>>> coming release-1.9. The flatAggregate can emit multi records for a single
>>>> group. More details here[1][2].
>>>> Hope this can solve your problem.
>>>>
>>>> Best, Hequn
>>

Re: Add Logical filter on a query plan from Flink Table API

2019-07-09 Thread Hequn Cheng
Hi Felipe,

Yes, we are short of such tutorials. Probably you can take a look at the
code of Flink-9713[1](checking the changelog in IDE is more convenient).
The code shows how to create a logical node and how to use a rule to
convert it into a FlinkLogicalRel and then convert into a DataStream
RelNode.
Hope this helps.

[1] https://github.com/apache/flink/pull/6299/files

On Tue, Jul 9, 2019 at 9:37 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi Hequn,
>
> it has been very hard to find even a very small tutorial of how to create
> my on rule in Calcite+Flink. What I did was copy a Calcite rule to my
> project and try to understand it. I am working with the FilterJoinRule [1]
> which is one rule the Flink is modifying it. In the end I want to create a
> rule for Join operators that allow me to choose between different
> implementations of Join algorithms (nested-loop, sort-merge, hash).
>
> If you have any step-by-step on understanding the "RelOptRuleCall"
> parameter would be very nice =). but I guess I have to keep digging into
> the code...
>
> Thanks anyway!
> [1]
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/FilterJoinRule.java
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Tue, Jul 9, 2019 at 2:10 PM Hequn Cheng  wrote:
>
>> Hi Felipe,
>>
>> > what is the relation of RelFactories [1] when I use it to create the
>> INSTANCE of my rule?
>> The `RelFactories.LOGICAL_BUILDER` can be used during the rule
>> transformation, i.e., the `RelFactories.LOGICAL_BUILDER` is a
>> `RelBuilderFactory` which contains a `create` method can be used to create
>> a `RelBuilder`. The `RelBuilder`[1] is used to create relational
>> expressions.
>>
>> Maybe we can also post the question in the Calcite mailing list. They may
>> give more details. :-)
>>
>> Best,
>> Hequn
>>
>> [1]
>> https://github.com/hequn8128/calcite/blob/master/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
>>
>>
>> On Tue, Jul 9, 2019 at 4:06 PM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi Hequn,
>>>
>>> what is the relation of RelFactories [1] when I use it to create the
>>> INSTANCE of my rule? For example:
>>>
>>> public static final MyFilterRule INSTANCE = new
>>> MyFilterRule(Filter.class, RelFactories.LOGICAL_BUILDER);
>>>
>>> then I create a CalciteCOnfigBuilder using "new
>>> CalciteConfigBuilder().addLogicalOptRuleSet(), .addNormRuleSet(),
>>> .addPhysicalOptRuleSet()".
>>>
>>> [1]
>>> https://calcite.apache.org/apidocs/org/apache/calcite/rel/core/RelFactories.html#LOGICAL_BUILDER
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Tue, Jul 9, 2019 at 5:06 AM Hequn Cheng  wrote:
>>>
>>>> Hi Felipe,
>>>>
>>>> > I would like to create a logical filter if there is no filter set on
>>>> the logical query. How should I implement it?
>>>> Do you mean you want to add a LogicalFilter node if the query even
>>>> doesn't contain filter? Logically, this can be done through a rule.
>>>> However, it sounds a little hack and you have to pay attention to semantic
>>>> problems. One thing you have to notice is that you can't change the RowType
>>>> when you perform your rules, i.e., for NodeA -> rule -> NodeB, NodeB should
>>>> contain the same row type with NodeA.
>>>> There are a lot of rules in Flink which I think is a good example for
>>>> you. You can find these rules in the class of `FlinkRuleSets`.
>>>>
>>>> > I see my LogicalFilter been created when I call "tableEnv.explain()"
>>>> method. I suppose that I can add some logical filters on the plan.
>>>> The `LogicalFilter` and `DataStreamCalc` is not created by your Filter
>>>> rule. If you remove your filter rule, there is nothing change for the plan.
>>>>
>>>> Best, Hequn
>>>>
>>>> On Mon, Jul 8, 2019 at 11:13 PM Felipe Gutierrez <
>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am a newbie in Apache Calci

Re: Add Logical filter on a query plan from Flink Table API

2019-07-09 Thread Hequn Cheng
Hi Felipe,

> what is the relation of RelFactories [1] when I use it to create the
INSTANCE of my rule?
The `RelFactories.LOGICAL_BUILDER` can be used during the rule
transformation, i.e., the `RelFactories.LOGICAL_BUILDER` is a
`RelBuilderFactory` which contains a `create` method can be used to create
a `RelBuilder`. The `RelBuilder`[1] is used to create relational
expressions.

Maybe we can also post the question in the Calcite mailing list. They may
give more details. :-)

Best,
Hequn

[1]
https://github.com/hequn8128/calcite/blob/master/core/src/main/java/org/apache/calcite/tools/RelBuilder.java


On Tue, Jul 9, 2019 at 4:06 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi Hequn,
>
> what is the relation of RelFactories [1] when I use it to create the
> INSTANCE of my rule? For example:
>
> public static final MyFilterRule INSTANCE = new MyFilterRule(Filter.class,
> RelFactories.LOGICAL_BUILDER);
>
> then I create a CalciteCOnfigBuilder using "new
> CalciteConfigBuilder().addLogicalOptRuleSet(), .addNormRuleSet(),
> .addPhysicalOptRuleSet()".
>
> [1]
> https://calcite.apache.org/apidocs/org/apache/calcite/rel/core/RelFactories.html#LOGICAL_BUILDER
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Tue, Jul 9, 2019 at 5:06 AM Hequn Cheng  wrote:
>
>> Hi Felipe,
>>
>> > I would like to create a logical filter if there is no filter set on
>> the logical query. How should I implement it?
>> Do you mean you want to add a LogicalFilter node if the query even
>> doesn't contain filter? Logically, this can be done through a rule.
>> However, it sounds a little hack and you have to pay attention to semantic
>> problems. One thing you have to notice is that you can't change the RowType
>> when you perform your rules, i.e., for NodeA -> rule -> NodeB, NodeB should
>> contain the same row type with NodeA.
>> There are a lot of rules in Flink which I think is a good example for
>> you. You can find these rules in the class of `FlinkRuleSets`.
>>
>> > I see my LogicalFilter been created when I call "tableEnv.explain()"
>> method. I suppose that I can add some logical filters on the plan.
>> The `LogicalFilter` and `DataStreamCalc` is not created by your Filter
>> rule. If you remove your filter rule, there is nothing change for the plan.
>>
>> Best, Hequn
>>
>> On Mon, Jul 8, 2019 at 11:13 PM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am a newbie in Apache Calcite. I am trying to use it with Apache
>>> Flink. To start I am trying to create a HelloWorld which just add a logical
>>> filter on my query.
>>> 1 - I have my Flink app using Table API [1].
>>> 2 - I have created my Calcite filter rule which is applied to my FLink
>>> query if I use CalciteConfig cc = new
>>> CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build()
>>> [2];
>>> 3 - The debug thread only goes to my rule if there is a filter on my
>>> query.
>>>
>>> I would like to create a logical filter if there is no filter set on the
>>> logical query. How should I implement it?
>>> I see my LogicalFilter been created when I call "tableEnv.explain()"
>>> method. I suppose that I can add some logical filters on the plan.
>>>
>>> == Abstract Syntax Tree ==
>>> LogicalFilter(condition=[>=($6, 50)])
>>>   LogicalTableScan(table=[[TicketsStation01Plat01]])
>>>
>>> == Optimized Logical Plan ==
>>> DataStreamCalc(select=[sensorId, sensorType, platformId, platformType,
>>> stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
>>>   StreamTableSourceScan(table=[[TicketsStation01Plat01]],
>>> fields=[sensorId, sensorType, platformId, platformType, stationId,
>>> timestamp, value, trip, eventTime], source=[SensorTuples])
>>>
>>> == Physical Execution Plan ==
>>> 
>>>
>>> Thanks,
>>> Felipe
>>>
>>> [1]
>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
>>> [2]
>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>


Re: Add Logical filter on a query plan from Flink Table API

2019-07-08 Thread Hequn Cheng
Hi Felipe,

> I would like to create a logical filter if there is no filter set on the
logical query. How should I implement it?
Do you mean you want to add a LogicalFilter node if the query even doesn't
contain filter? Logically, this can be done through a rule. However, it
sounds a little hack and you have to pay attention to semantic problems.
One thing you have to notice is that you can't change the RowType when you
perform your rules, i.e., for NodeA -> rule -> NodeB, NodeB should contain
the same row type with NodeA.
There are a lot of rules in Flink which I think is a good example for you.
You can find these rules in the class of `FlinkRuleSets`.

> I see my LogicalFilter been created when I call "tableEnv.explain()"
method. I suppose that I can add some logical filters on the plan.
The `LogicalFilter` and `DataStreamCalc` is not created by your Filter
rule. If you remove your filter rule, there is nothing change for the plan.

Best, Hequn

On Mon, Jul 8, 2019 at 11:13 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi,
>
> I am a newbie in Apache Calcite. I am trying to use it with Apache Flink.
> To start I am trying to create a HelloWorld which just add a logical filter
> on my query.
> 1 - I have my Flink app using Table API [1].
> 2 - I have created my Calcite filter rule which is applied to my FLink
> query if I use CalciteConfig cc = new
> CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build()
> [2];
> 3 - The debug thread only goes to my rule if there is a filter on my query.
>
> I would like to create a logical filter if there is no filter set on the
> logical query. How should I implement it?
> I see my LogicalFilter been created when I call "tableEnv.explain()"
> method. I suppose that I can add some logical filters on the plan.
>
> == Abstract Syntax Tree ==
> LogicalFilter(condition=[>=($6, 50)])
>   LogicalTableScan(table=[[TicketsStation01Plat01]])
>
> == Optimized Logical Plan ==
> DataStreamCalc(select=[sensorId, sensorType, platformId, platformType,
> stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
>   StreamTableSourceScan(table=[[TicketsStation01Plat01]],
> fields=[sensorId, sensorType, platformId, platformType, stationId,
> timestamp, value, trip, eventTime], source=[SensorTuples])
>
> == Physical Execution Plan ==
> 
>
> Thanks,
> Felipe
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>


Re: Table API and ProcessWindowFunction

2019-07-08 Thread Hequn Cheng
Hi Flavio,

Thanks for your information.

>From your description, it seems that you only use the window to get the
start and end time. There are no aggregations happen. If this is the case,
you can get the start and end time by yourself(the
`TimeWindow.getWindowStartWithOffset()` shows how to get window start
according to the timestamp). To be more specific, if you use processing
time, you can get your timestamp with System.currentTimeMillis(), and then
use it to get the window start and end
with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get
the timestamp from the rowtime field.

With the start and end time, you can then perform LATERAL JOIN to enrich
the information. You can add a cache in your table function to avoid
frequent contacting with the REST endpoint.

Best, Hequn


On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier 
wrote:

> Hi Hequn, thanks for your answer.
> What I'm trying to do is to read a stream of events that basically
> contains a UserId field and, every X minutes (i.e. using a Time Window) and
> for each different UserId key, query 3 different REST services to enrich my
> POJOs*.
> For the moment what I do is to use a ProcessWindowFunction after the
> .keyBy().window() as shown in the  previous mail example to contact those 3
> services and enrich my object.
>
> However I don't like this solution because I'd like to use Flink to it's
> full potential so I'd like to enrich my object using LATERAL TABLEs or
> ASYNC IO..
> The main problem I'm facing right now is that  I can't find a way to pass
> the thumbing window start/end to the LATERAL JOIN table functions (because
> this is a parameter of the REST query).
> Moreover I don't know whether this use case is something that Table API
> aims to solve..
>
> * Of course this could kill the REST endpoint if the number of users is
> very big ..because of this I'd like to keep the external state of source
> tables as an internal Flink state and then do a JOIN on the UserId. The
> problem here is that I need to "materialize" them using Debezium (or
> similar) via Kafka and dynamic tables..is there any example of keeping
> multiple tables synched with Flink state through Debezium (without the need
> of rewriting all the logic for managing UPDATE/INSERT/DELETE)?
>
> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng  wrote:
>
>> Hi Flavio,
>>
>> Nice to hear your ideas on Table API!
>>
>> Could you be more specific about your requirements? A detailed scenario
>> would be quite helpful. For example, do you want to emit multi records
>> through the collector or do you want to use the timer?
>>
>> BTW, Table API introduces flatAggregate recently(both non-window
>> flatAggregate and window flatAggregate) and will be included in the near
>> coming release-1.9. The flatAggregate can emit multi records for a single
>> group. More details here[1][2].
>> Hope this can solve your problem.
>>
>> Best, Hequn
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions
>>
>> On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi to all,
>>> from what I understood a ProcessWindowFunction can only be used in the
>>> Streaming API.
>>> Is there any plan to port them also in the Table API (in the near
>>> future)?
>>> I'd like to do with Table API the equivalent of:
>>>
>>> final DataStream events = env.addSource(src);
>>> events.filter(e -> e.getCode() != null)
>>> .keyBy(event -> Integer.valueOf(event.getCode()))
>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>> .process(new ProcessWindowFunction>> Integer, TimeWindow>()  {.});
>>>
>>> Best,
>>> Flavio
>>>
>>
>
>


Re: Table API and ProcessWindowFunction

2019-07-08 Thread Hequn Cheng
Hi Flavio,

Nice to hear your ideas on Table API!

Could you be more specific about your requirements? A detailed scenario
would be quite helpful. For example, do you want to emit multi records
through the collector or do you want to use the timer?

BTW, Table API introduces flatAggregate recently(both non-window
flatAggregate and window flatAggregate) and will be included in the near
coming release-1.9. The flatAggregate can emit multi records for a single
group. More details here[1][2].
Hope this can solve your problem.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions

On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier 
wrote:

> Hi to all,
> from what I understood a ProcessWindowFunction can only be used in the
> Streaming API.
> Is there any plan to port them also in the Table API (in the near future)?
> I'd like to do with Table API the equivalent of:
>
> final DataStream events = env.addSource(src);
> events.filter(e -> e.getCode() != null)
> .keyBy(event -> Integer.valueOf(event.getCode()))
> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
> .process(new ProcessWindowFunction TimeWindow>()  {.});
>
> Best,
> Flavio
>


Re: [ANNOUNCE] Apache Flink 1.8.1 released

2019-07-02 Thread Hequn Cheng
Thanks for being the release manager and the great work Jincheng!
Also thanks to Gorden and the community making this release possible!

Best, Hequn

On Wed, Jul 3, 2019 at 9:40 AM jincheng sun 
wrote:

> Hi,
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.8.1, which is the first bugfix release for the Apache Flink 1.8
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the
> improvements for this bugfix release:
> https://flink.apache.org/news/2019/07/02/release-1.8.1.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12345164
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Great thanks to @Tzu-Li (Gordon) Tai  's offline
> kind help!
>
> Regards,
> Jincheng
>


Re: Idle windows

2019-06-21 Thread Hequn Cheng
Hi Ustinov,

I guess you have mixed the concept between remainder and the parallelism,
i.e., data with remainder 0 don't mean they will be processed by the 0th
task after keyBy.
Flink will perform a Hash function on the key you have provided, and
partition the record based on the key group range.

KeyBy makes sure that the same key goes to the same place, if you want to
balance the workload, you need to have more different keys.

Best, Hequn


On Fri, Jun 21, 2019 at 6:23 PM Ustinov Anton  wrote:

> I have a simple job that reads JSON messages from Kafka topic and
> proccesses them like this:
>
> SingleOutputStreamOperator result = ds
> .filter(ev -> ev.has(cookieFieldName))
> .map(ev -> ev.get(cookieFieldName).asText())
> .keyBy(new CookieKeySelector(env.getParallelism()))
> .timeWindow(Time.seconds(period))
> .aggregate(new CookieAggregate())
> .timeWindowAll(Time.seconds(period))
> .reduce((v1, v2) -> v1 + v2);
>
> CookieKeySelector counts MD5 hash from cookie value and calculate
> remainder from division on job parallelism. CookieAggreage counts unique
> cookie values in window. I see in Flink Dashboard that only half of
> windows are getting messages to process. Number of working windows depends
> on job parallelism. Why only part of windows compute useful aggregates?
> I’ve tried to use random numbers as a key and still get same result.
>
> Additional information: Flink 1.8.0, runs on a single node with 56 CPUs,
> 256G RAM, 10GB/s network.
>
>
> Anton Ustinov
> ustinov@gmail.com
>
>


Re: How can I improve this Flink application for "Distinct Count of elements" in the data stream?

2019-06-12 Thread Hequn Cheng
Hi Felipe,

>From your code, I think you want to get the "count distinct" result instead
of the "distinct count". They contain a different meaning.

To improve the performance, you can replace
your DistinctProcessWindowFunction to a DistinctProcessReduceFunction. A
ReduceFunction can aggregate the elements of a window incrementally, while
for ProcessWindowFunction, elements cannot be incrementally aggregated but
instead need to be buffered internally until the window is considered ready
for processing.

> Flink does not have a built-in operator which does this computation.
Flink does have built-in operators to solve your problem. You can use Table
API & SQL to solve your problem. The code looks like:

public static void main(String[] args) throws Exception {
   StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
   StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

   DataStream ds = env.socketTextStream("localhost", 9000);
   tableEnv.registerDataStream("sourceTable", ds, "line, proctime.proctime");

   SplitTableFunction splitFunc = new SplitTableFunction();
   tableEnv.registerFunction("splitFunc", splitFunc);
   Table result = tableEnv.scan("sourceTable")
 .joinLateral("splitFunc(line) as word")
 .window(Tumble.over("5.seconds").on("proctime").as("w"))
 .groupBy("w")
 .select("count.distinct(word), collect.distinct(word)");

   tableEnv.toAppendStream(result, Row.class).print();
   env.execute();
}

Detail code can be found here[1].

At the same time, you can perform two-stage window to improve the
performance, i.e., the first window aggregate is used to distinct words and
the second window used to get the final results.

Document about Table API and SQL can be found here[2][3].

Best, Hequn

[1]
https://github.com/hequn8128/flink/commit/b4676a5730cecabe2931b9e628aaebd7729beab2
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html


On Wed, Jun 12, 2019 at 9:19 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi Rong, I implemented my solution using a ProcessingWindow
> with timeWindow and a ReduceFunction with timeWindowAll [1]. So for the
> first window I use parallelism and the second window I use to merge
> everything on the Reducer. I guess it is the best approach to do
> DistinctCount. Would you suggest some improvements?
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountProcessTimeWindowSocket.java
>
> Thanks!
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Wed, Jun 12, 2019 at 9:27 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi Rong,
>>
>> thanks for your answer. If I understood well, the option will be to use
>> ProcessFunction [1] since it has the method onTimer(). But not the
>> ProcessWindowFunction [2], because it does not have the method onTimer(). I
>> will need this method to call Collector out.collect(...) from the
>> onTImer() method in order to emit a single value of my Distinct Count
>> function.
>>
>> Is that reasonable what I am saying?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/datastream/DataStream.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html
>>
>> Kind Regards,
>> Felipe
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Wed, Jun 12, 2019 at 3:41 AM Rong Rong  wrote:
>>
>>> Hi Felipe,
>>>
>>> there are multiple ways to do DISTINCT COUNT in Table/SQL API. In fact
>>> there's already a thread going on recently [1]
>>> Based on the description you provided, it seems like it might be a
>>> better API level to use.
>>>
>>> To answer your question,
>>> - You should be able to use other TimeCharacteristic. You might want to
>>> try WindowProcessFunction and see if this fits your use case.
>>> - Not sure I fully understand the question, your keyed by should be done
>>> on your distinct key (or a combo key) and if you do keyby correctly then
>>> yes all msg with same key is processed by the same TM thread.
>>>
>>> --
>>> Rong
>>>
>>>
>>>
>>> [1]
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/count-DISTINCT-in-flink-SQL-td28061.html
>>>
>>> On Tue, Jun 11, 2019 at 1:27 AM Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>>
 Hi all,

 I have implemented a Flink data stream application to compute distinct
 count of words. Flink does not have a built-in operator which does this
 computation.

Re: [DISCUSS] Deprecate previous Python APIs

2019-06-11 Thread Hequn Cheng
+1 on the proposal!
Maintaining only one Python API is helpful for users and contributors.

Best, Hequn

On Wed, Jun 12, 2019 at 9:41 AM Jark Wu  wrote:

> +1 and looking forward to the new Python API world.
>
> Best,
> Jark
>
> On Wed, 12 Jun 2019 at 09:22, Becket Qin  wrote:
>
>> +1 on deprecating the old Python API in 1.9 release.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Wed, Jun 12, 2019 at 9:07 AM Dian Fu  wrote:
>>
>>> +1 for this proposal.
>>>
>>> Regards,
>>> Dian
>>>
>>> 在 2019年6月12日,上午8:24,jincheng sun  写道:
>>>
>>> big +1 for the proposal.
>>>
>>> We will soon complete all the Python API functional development of the
>>> 1.9 release, the development of UDFs will be carried out. After the support
>>> of UDFs is completed, it will be very natural to support Datastream API.
>>>
>>> If all of us agree with this proposal, I believe that for the 1.10
>>> release, it is possible to complete support both UDFs and DataStream API.
>>> And we will do our best to make the 1.10 release that contains the Python
>>> DataStream API support.
>>>
>>> So, great thanks to @Stephan for this proposal!
>>>
>>> Best,
>>> Jincheng
>>>
>>> Zili Chen  于2019年6月11日周二 下午10:56写道:
>>>
 +1

 Best,
 tison.


 zhijiang  于2019年6月11日周二 下午10:52写道:

> It is reasonable as stephan explained. +1 from my side!
>
> --
> From:Jeff Zhang 
> Send Time:2019年6月11日(星期二) 22:11
> To:Stephan Ewen 
> Cc:user ; dev 
> Subject:Re: [DISCUSS] Deprecate previous Python APIs
>
> +1
>
> Stephan Ewen  于2019年6月11日周二 下午9:30写道:
>
> > Hi all!
> >
>
> > I would suggest to deprecating the existing python APIs for DataSet and
> > DataStream API with the 1.9 release.
> >
> > Background is that there is a new Python API under development.
> > The new Python API is initially against the Table API. Flink 1.9 will
>
> > support Table API programs without UDFs, 1.10 is planned to support 
> > UDFs.
> > Future versions would support also the DataStream API.
> >
> > In the long term, Flink should have one Python API for DataStream and
>
> > Table APIs. We should not maintain multiple different implementations 
> > and
> > confuse users that way.
>
> > Given that the existing Python APIs are a bit limited and not under 
> > active
>
> > development, I would suggest to deprecate them in favor of the new API.
> >
> > Best,
> > Stephan
> >
> >
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>
>>>


Re: [SURVEY] Usage of flink-ml and [DISCUSS] Delete flink-ml

2019-05-27 Thread Hequn Cheng
Hi Shaoxuan,

Thanks a lot for driving this. +1 to remove the module.

The git log of this module shows that it has been inactive for a long time.
I think it's ok to remove it for now. It would also be good to switch to
the new interface earlier.

Best, Hequn

On Mon, May 27, 2019 at 8:58 PM Becket Qin  wrote:

> +1 for removal. Personally I'd prefer marking it as deprecated and remove
> the module in the next release, just to follow the established procedure.
>
> And +1 on removing the `flink-libraries/flink-ml-uber` as well.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, May 27, 2019 at 5:07 PM jincheng sun 
> wrote:
>
>> +1 for remove it!
>>
>> And we also plan to delete the `flink-libraries/flink-ml-uber`, right?
>>
>> Best,
>> Jincheng
>>
>> Rong Rong  于2019年5月24日周五 上午1:18写道:
>>
>>> +1 for the deletion.
>>>
>>> Also I think it also might be a good idea to update the roadmap for the
>>> plan of removal/development since we've reached the consensus on FLIP-39.
>>>
>>> Thanks,
>>> Rong
>>>
>>>
>>> On Wed, May 22, 2019 at 8:26 AM Shaoxuan Wang 
>>> wrote:
>>>
 Hi Chesnay,
 Yes, you are right. There is not any active commit planned for the
 legacy Flink-ml package. It does not matter delete it now or later. I will
 open a PR and remove it.

 Shaoxuan

 On Wed, May 22, 2019 at 7:05 PM Chesnay Schepler 
 wrote:

> I believe we can remove it regardless since users could just use the
> 1.8
> version against future releases.
>
> Generally speaking, any library/connector that is no longer actively
> developed can be removed from the project as existing users can always
> rely on previous versions, which should continue to work by virtue of
> working against @Stable APIs.
>
> On 22/05/2019 12:08, Shaoxuan Wang wrote:
> > Hi Flink community,
> >
> > We plan to delete/deprecate the legacy flink-libraries/flink-ml
> package in
> > Flink1.9, and replace it with the new flink-ml interface proposed in
> FLIP39
> > (FLINK-12470).
> > Before we remove this package, I want to reach out to you and ask if
> there
> > is any active project still uses this package. Please respond to this
> > thread and outline how you use flink-libraries/flink-ml.
> > Depending on the replies of activity and adoption
> > of flink-libraries/flink-ml, we will decide to either delete this
> package
> > in Flink1.9 or deprecate it for now & remove it in the next release
> after
> > 1.9.
> >
> > Thanks for your attention and help!
> >
> > Regards,
> > Shaoxuan
> >
>
>


Re: Reconstruct object through partial select query

2019-05-12 Thread Hequn Cheng
Hi shahar,

An easier way to solve your problem is to use a Row to store your data
instead of the `TaggedEvent `. I think this is what Fabian means. In this
way, you don't have to define the user-defined TypeFactory and use the Row
type directly. Take `TaggedEvent` as an example, the corresponding row
type is `Types.ROW(Types.ROW(Types.INT, Types.STRING),
Types.OBJECT_ARRAY(Types.STRING))` in which Types is
`org.apache.flink.table.api.Types`. Furthermore, row type is also easier to
cooperate with Table API & SQL.

However, if the `TaggedEvent` is a must-have for you, you can take a look
at the MapView[1] as an example of how to define a user-defined table
factory.

Best, Hequn

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala

On Sat, May 11, 2019 at 1:20 AM Shahar Cizer Kobrinsky <
shahar.kobrin...@gmail.com> wrote:

> Hi Fabian,
>
> I have a trouble implementing the type for this operation, i wonder how i
> can do that.
> So given generic type T i want to create a TypeInformation for:
> class TaggedEvent {
>String[] tags
>T originalEvent
> }
>
> Was trying a few different things but not sure how to do it.
> Doesn't seem like i can use TypeHint as i need to know the actual generics
> class for it, right?
> Do i need a TaggedEventTypeFactory? If so, how do i create the
> TaggedEventTypeInfo for it?  do you have an example for it? was trying to
> follow this[1] but doesn't seem to really work. I'm getting null as my
> genericParameter for some reason. Also, how would you create the serializer
> for the type info? can i reuse some builtin Kryo functionality?
>
> Thanks
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer
>
>
>
>
>
> On Thu, May 9, 2019 at 9:08 AM Shahar Cizer Kobrinsky <
> shahar.kobrin...@gmail.com> wrote:
>
>> Thanks Fabian,
>>
>> I'm looking into a way to enrich it without having to know the internal
>> fields of the original event type.
>> Right now what I managed to do is to map Car into a TaggedEvent
>> prior to the SQL query, tags being empty, then run the SQL query selecting 
>> *origin,
>> enrich(.. ) as tags*
>> Not sure there's a better way but i guess that works
>>
>>
>>
>> On Thu, May 9, 2019 at 12:50 AM Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> you can use the value construction function ROW to create a nested row
>>> (or object).
>>> However, you have to explicitly reference all attributes that you will
>>> add.
>>>
>>> If you have a table Cars with (year, modelName) a query could look like
>>> this:
>>>
>>> SELECT
>>>   ROW(year, modelName) AS car,
>>>   enrich(year, modelName) AS tags
>>> FROM Cars;
>>>
>>> Handling many attributes is always a bit painful in SQL.
>>> There is an effort to make the Table API easier to use for these use
>>> cases (for example Column Operations [1]).
>>>
>>> Best, Fabian
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-11967
>>>
>>>
>>>
>>> Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 <
>>> shahar.kobrin...@gmail.com>:
>>>
 Just to be more clear on my goal -
 Im trying to enrich the incoming stream with some meaningful tags based
 on
 conditions from the event itself.
 So the input stream could be an event looks like:
 Class Car {
   int year;
   String modelName;
 }

 i will have a config that are defining tags as:
 "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0"

 So ideally my output will be in the structure of

 Class TaggedEvent {
Car origin;
String[] tags;
 }






 --
 Sent from:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

>>>


Re: Rich and incrementally aggregating window functions

2019-05-08 Thread Hequn Cheng
Hi,

There is a discussion about this before, you can take a look at it[1].
Best, Hequn

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-implementation-of-aggregate-function-using-a-ProcessFunction-td23473.html#a23531

On Thu, May 9, 2019 at 5:14 AM an0  wrote:

> I want to use ProcessWindowFunction.Context#globalState in my window
> function. But I don't want to apply ProcessWindowFunction directly to my
> WindowedStream because I don't want to buffer all the elements of each
> window. Currently I'm using WindowedStream#aggregate(AggregateFunction,
> ProcessWindowFunction).
>
> My understanding is that RichFunction.runtimeContext also give access to
> those global states. That thought naturally pointed me to
> RichAggregateFunction, RichReduceFunction and RichFoldFunction. However,
> they all cause runtime error like this:
> "AggregateFunction can not be a RichFunction. Please use
> fold(AggregateFunction, WindowFunction) instead."
>
> So how can I use an incrementally aggregating window function and have
> access to global states at the same time?
>


Re: 回复:Is it possible to handle late data when using table API?

2019-04-16 Thread Hequn Cheng
Hi Lasse,

> some devices can deliver data days back in time and I would like to have
the results as fast as possible.

What JingsongLee said is correct.

However, it's possible to handle your problem with Table API according to
your description above. You can use the non-window(or unbounded)
aggregate[1].
The non-window aggregate supports early fire, i.e., output results
immediately once there is an update, so you can "have the results as fast
as possible". The query looks like:

 Table res30MinWindows = machineInsights
.select("UserActionTime / (30 * 60) as windowId, machineId,
machineInsightId, value")
.groupBy("windowId, machineId, machineInsightId")
.select("machineId, machineInsightId, windowId as wStart, windowId + 1800
as sEnd, value.max as max")

Only you have to notice is, as non-window aggregate keeps all (result)data
in its state, the required state to compute the query result might grow
infinitely depending on the type of aggregation and the number of distinct
grouping keys. To solve this problem, you can provide a query configuration
with a valid retention interval to prevent excessive state size[2].
In your case, I think the valid retention interval would be the max delay
interval of your data.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/tableApi.html#aggregations
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html


On Tue, Apr 16, 2019 at 5:38 PM Lasse Nedergaard 
wrote:

> Hi
>
> Thanks for the fast reply. Unfortunately it not an option as some devices
> can deliver data days back in time and I would like to have the results as
> fast as possible.
> I have to convert my implementation to use streaming API instead.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 16. apr. 2019 kl. 11.08 skrev JingsongLee :
>
> Hi @Lasse Nedergaard, Table API don't have allowedLateness api.
> But you can set rowtime.watermarks.delay of source to slow down the
> watermark clock.
>
> --
> 发件人:Lasse Nedergaard 
> 发送时间:2019年4月16日(星期二) 16:20
> 收件人:user 
> 主 题:Is it possible to handle late data when using table API?
>
> Hi.
>
> I have a simple tumble window working on eventtime like this.
>
> Table res30MinWindows = machineInsights
> .window(Tumble.over("30.minutes").on("UserActionTime").as("w")) // 
> define window
> .groupBy("machineId, machineInsightId, w") // group by key and window
> .select("machineId, machineInsightId, w.start, w.end, w.rowtime, 
> value.max as max"); // access window properties and aggregate
>
> As we work with Iot units we don't have 100% control over the eventtime 
> reported and therefore need to handle late data to ensure that we don't do 
> our calculation wrong.
>
> I would like to know if there is any option in the Table API to get access to 
> late data, or my only option is to use Streaming API?
>
> Thanks in advance
>
> Lasse Nedergaard
>
>
>
>


Re: Identify orphan records after joining two streams

2019-04-15 Thread Hequn Cheng
Hi Averell,

> I feel that it's over-complicated
I think a Table API or SQL[1] job can also achieve what you want. Probably
more simple and takes up less code.
The workflow looks like:
1. union all two source tables. You may need to unify the schema of the two
tables as union all can only used to union tables with the same schema.
2. perform window group by, i.e., group by tumbling window + key.
3. write an user-defined aggregate function[2] which is used to merge the
data.

> my cached data would build up and make my cluster out-of-memory.
You can use the `RocksDBStateBackend`[3]. The amount of state that you can
keep is only limited by the amount of disk space available.

> Would back-pressure kicks in for this case?
It seems there are no direct ways to aliment different sources now.
However, the community has already discussed and trying to solve it[4].

Best, Hequn

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#aggregation-functions
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
[4]
https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit#heading=h.lfi2u4fv9cxa

On Mon, Apr 15, 2019 at 8:08 PM Averell  wrote:

> Hello,
>
> I have two data streams, and want to join them using a tumbling window.
> Each
> of the streams would have at most one record per window. There is also a
> requirement to log/save the records that don't have a companion from the
> other stream.
> What would be the best option for my case? Would that be possible to use
> Flink's Join?
>
> I tried to use CoProcessFunction: truncating the timestamp of each record
> to
> the beginning of the tumbling window, and then "keyBy" the two streams
> using
> (key, truncated-timestamp). When I receive a record from one stream, if
> that's the first record of the pair, then I save it to a MapState. If it is
> the 2nd record, then I merge with the 1st one then fire.
> This implementation works, but
> (a) I feel that it's over-complicated, and
> (b) I have a concern that when one stream is slower than the other, my
> cached data would build up and make my cluster out-of-memory. Would
> back-pressure kicks in for this case?
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Join of DataStream and DataSet

2019-04-14 Thread Hequn Cheng
Hi Reminia,

Currently, we can't join a DataStream with a DataSet in Flink. However, the
DataSet is actually a kind of bounded stream. From the point of this view,
you can use a streaming job to achieve your goal. Flink Table API & SQL
support different kinds of join[1]. You can take a closer look at them.
Probably a regular join[2] is ok for you.

Finally, I think you raised a very good point. It would be better if Flink
can support such kind of join more direct and efficient.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#regular-joins

On Thu, Apr 11, 2019 at 5:16 PM Reminia Scarlet 
wrote:

> Spark streaming supports direct join from stream DataFrame and batch
> DataFrame , and it's
> easy to implement an enrich pipeline that joins a stream and a dimension
> table.
>
>  I checked the doc of flink, seems that this feature is a jira ticket
> which haven't been resolved yet.
>
> So how can I implement such a pipeline easily in Flink?
>
>
>


Re: [ANNOUNCE] Apache Flink 1.8.0 released

2019-04-10 Thread Hequn Cheng
Thanks a lot for the great release Aljoscha!
Also thanks for the work by the whole community. :-)

Best, Hequn

On Wed, Apr 10, 2019 at 6:03 PM Fabian Hueske  wrote:

> Congrats to everyone!
>
> Thanks Aljoscha and all contributors.
>
> Cheers, Fabian
>
> Am Mi., 10. Apr. 2019 um 11:54 Uhr schrieb Congxian Qiu <
> qcx978132...@gmail.com>:
>
>> Cool!
>>
>> Thanks Aljoscha a lot for being our release manager, and all the others
>> who make this release possible.
>>
>> Best, Congxian
>> On Apr 10, 2019, 17:47 +0800, Jark Wu , wrote:
>> > Cheers!
>> >
>> > Thanks Aljoscha and all others who make 1.8.0 possible.
>> >
>> > On Wed, 10 Apr 2019 at 17:33, vino yang  wrote:
>> >
>> > > Great news!
>> > >
>> > > Thanks Aljoscha for being the release manager and thanks to all the
>> > > contributors!
>> > >
>> > > Best,
>> > > Vino
>> > >
>> > > Driesprong, Fokko  于2019年4月10日周三 下午4:54写道:
>> > >
>> > > > Great news! Great effort by the community to make this happen.
>> Thanks all!
>> > > >
>> > > > Cheers, Fokko
>> > > >
>> > > > Op wo 10 apr. 2019 om 10:50 schreef Shaoxuan Wang <
>> wshaox...@gmail.com>:
>> > > >
>> > > > > Thanks Aljoscha and all others who made contributions to FLINK
>> 1.8.0.
>> > > > > Looking forward to FLINK 1.9.0.
>> > > > >
>> > > > > Regards,
>> > > > > Shaoxuan
>> > > > >
>> > > > > On Wed, Apr 10, 2019 at 4:31 PM Aljoscha Krettek <
>> aljos...@apache.org>
>> > > > > wrote:
>> > > > >
>> > > > > > The Apache Flink community is very happy to announce the
>> release of
>> > > > > Apache
>> > > > > > Flink 1.8.0, which is the next major release.
>> > > > > >
>> > > > > > Apache Flink® is an open-source stream processing framework for
>> > > > > > distributed, high-performing, always-available, and accurate
>> data
>> > > > > streaming
>> > > > > > applications.
>> > > > > >
>> > > > > > The release is available for download at:
>> > > > > > https://flink.apache.org/downloads.html
>> > > > > >
>> > > > > > Please check out the release blog post for an overview of the
>> > > > > improvements
>> > > > > > for this bugfix release:
>> > > > > > https://flink.apache.org/news/2019/04/09/release-1.8.0.html
>> > > > > >
>> > > > > > The full release notes are available in Jira:
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344274
>> > > > > >
>> > > > > > We would like to thank all contributors of the Apache Flink
>> community
>> > > > who
>> > > > > > made this release possible!
>> > > > > >
>> > > > > > Regards,
>> > > > > > Aljoscha
>> > > > >
>> > > >
>> > >
>>
>


Re: Use different functions for different signal values

2019-04-02 Thread Hequn Cheng
Hi Marke,

Ken is right. We can use split and select to achieve what you want.

Besides, I am thinking if there is a lot of ingesting signals with unique
Id's, why not use one function and process different logic in the function.
For example, we can call different methods in the function according to the
value of the input "id field".  This can make the code more simple and
generic, IMO.
Is it because the return types of these functions are different?

Best, Hequn

On Wed, Apr 3, 2019 at 7:17 AM Ken Krugler 
wrote:

> Hi Marke,
>
> You can use DataStream.split() to create a SplitStream, and then call
> SplitStream.select() to create the three different paths to the three
> functions.
>
> See
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#datastream-transformations
>
> — Ken
>
> On Apr 2, 2019, at 8:41 AM, Marke Builder  wrote:
>
> Hi,
>
> I want to implement the following behavior:
>
> 
> There are a lot of ingest signals with unique Id's, I would use for each
> signal set a special function. E.g. Signal1, Signal2 ==> function1,
> Signal3, Signal4 ==> function2.
> What is the recommended way to implement this pattern?
>
> Thanks!
>
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>


Re: How to join stream and dimension data in Flink?

2019-03-13 Thread Hequn Cheng
Hi Henry,

These are good questions!
I would rather not to add the temporal and lateral prefix in front of the
join. The temporal table is a concept orthogonal to join. We should say
join a temporal table or join a Lateral table.
1. You can of course use stream-stream join. Introducing the temporal table
not only makes our query more simple but also improves performance. More
detail can be found in [1].
2. Both two joins based on the concept of temporal table, i.e., a table
joins a temporal table.
3. Yes, actually the join in Flink uses a lateral
table&TemporalTableFunction to implement a temporal table. A temporal table
is a versioned table and a lateral table is a table keeps references to the
previous table. If you do not want to use time version, you don't need the
temporal table.
4. It is a kind of join. The join keyword can be omitted if it is an inner
join. The grammar will not be changed in the near future. I haven't heard
some news about changing it.
5. Yes, it will be optimized.
6. If you want to left join a temporal table. You can write sql like:

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  LEFT JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

CC @Piotr Nowojski   Would be great to have your
opinions here.

Best,
Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table


On Wed, Mar 13, 2019 at 1:59 PM 徐涛  wrote:

> Hi Hequn,
> Thanks a lot for your answer! That is very helpful for me.
> I still have some questions about stream and dimension data join and
> temporal table join:
> 1. I found the temporal table join is still a one stream driven join, I do
> not know why the dimension data join has to be done by one stream driven
> join, why it can not be done by two stream join(traditional stream-stream
> join)?
> I try to give an answer about it: two stream join is based on the
> mechanism that is materialize two stream data in state, but the due to
> state retention, the dimension data may be lost. I guess this is one
> reason, am I correct?
> 2. Is Blink`s stream and dimension data join based on temporal table join?
> 3. I think lateral table join can also do dimension join if I do
> not want to use time versioning. How to choose between temporal table join
> and lateral table join?
> 4. I found that the temporal table join in Flink use a “LATERAL TABLE”
> grammar, but not “JOIN”, it is OK but not easier to use than “JOIN”, will
> the community modify the grammar in future releases?
> 5. In the following temporal table join statement, will the Orders table
> join Rates produce too many data before the where clause take effects? Will
> it be optimized?
>
> *SELECT*
> *  o.amount * r.rate AS amount*
> *FROM*
> *  Orders AS o,*
> *  LATERAL TABLE (Rates(o.rowtime)) AS r*
> *WHERE r.currency = o.currency *
>
> 6. How to use temporal table join to do left join?
>
>
> Best
> Henry
>
> 在 2019年3月13日,上午12:02,Hequn Cheng  写道:
>
> Hi Henry,
>
> Yes, you are correct. Basically, there are two ways you can use to join a
> Temporal Table. One is provided in Flink and the other is provided in Blink
> which has been pushed as a branch[1] in Flink repo.
>
> - Join a Temporal Table in Flink[2][3][4]
> As the document said: it is a join with a temporal table joins an
> append-only table (left input/probe side) with a temporal table (right
> input/build side), i.e., a table that changes over time and tracks its
> changes. You need to define a temporal table function and it will be used
> to provide access to the state of a temporal table at a specific point in
> time. *Both rowtime and proctime are supported.*
> - Join a Temporal Table in Blink[5]
> Different from the join in Flink, it can join an *append/upsert/retract*
> stream (left input/probe side) with a temporal table (right input/build
> side), i.e., a *remote dimension table* that changes over time. In order to
> access data in a temporal table, you need to define a TableSource with
> LookupableTableSource[6](Probably you can download the code of blink and
> take a look at the `HBase143TableSource` which is an implementation of
> LookupableTableSource). Currently, only proctime is supported.
>
> I think you can choose one according to your scenarios.
> There are some useful examples in the document I list below. They may be
> very helpful for you. Feel free to ask if you have any other questions.
>
> Best,
> Hequn
>
> [1] https://github.com/apache/flink/tree/blink
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>
> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.

Re: How to join stream and dimension data in Flink?

2019-03-12 Thread Hequn Cheng
Hi Henry,

Yes, you are correct. Basically, there are two ways you can use to join a
Temporal Table. One is provided in Flink and the other is provided in Blink
which has been pushed as a branch[1] in Flink repo.

- Join a Temporal Table in Flink[2][3][4]
As the document said: it is a join with a temporal table joins an
append-only table (left input/probe side) with a temporal table (right
input/build side), i.e., a table that changes over time and tracks its
changes. You need to define a temporal table function and it will be used
to provide access to the state of a temporal table at a specific point in
time. *Both rowtime and proctime are supported.*
- Join a Temporal Table in Blink[5]
Different from the join in Flink, it can join an *append/upsert/retract*
stream (left input/probe side) with a temporal table (right input/build
side), i.e., a *remote dimension table* that changes over time. In order to
access data in a temporal table, you need to define a TableSource with
LookupableTableSource[6](Probably you can download the code of blink and
take a look at the `HBase143TableSource` which is an implementation of
LookupableTableSource). Currently, only proctime is supported.

I think you can choose one according to your scenarios.
There are some useful examples in the document I list below. They may be
very helpful for you. Feel free to ask if you have any other questions.

Best,
Hequn

[1] https://github.com/apache/flink/tree/blink
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table

[3]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html
[4]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins
[5]
https://flink-china.org/doc/blink/dev/table/streaming/joins.html#join-with-a-temporal-table
[6]
https://flink-china.org/doc/blink/dev/table/sourcesinks#defining-a-tablesource-with-lookupable

On Tue, Mar 12, 2019 at 2:13 PM 徐涛  wrote:

> Hi Hequn,
> I want to implement stream join dimension in Flink SQL, I found there is a
> new feature named Temporal Tables delivered by Flink1.7, I think it maybe
> could be used to achieve the join between stream and dimension table. But I
> am not sure about that. Could anyone help me about it?
> Thanks a lot for your help.
>
> Best
> Henry
>
> 在 2018年9月26日,上午12:16,Hequn Cheng  写道:
>
> Hi vino,
>
> Thanks for sharing the link. It's a great book and I will take a look.
> There are kinds of join. Different joins have different semantics. From
> the link, I think it means the time versioned join.  FLINK-9712
> <https://issues.apache.org/jira/browse/FLINK-9712> enrichments joins with
> Time Versioned Functions and the result is deterministic under eventime.
>
> Best, Hequn
>
> On Tue, Sep 25, 2018 at 11:05 PM vino yang  wrote:
>
>> Hi Hequn,
>>
>> The specific content of the book does not give a right or wrong
>> conclusion, but it illustrates this phenomenon: two streams of the same
>> input, playing and joining at the same time, due to the order of events,
>> the connection results are uncertain. This is because the two streams are
>> intertwined in different forms. This has nothing to do with orderby, just
>> that it exists in the stream stream join. Of course, this phenomenon is
>> only a comparison statement with a non-stream join.
>>
>> In addition, I recommend this book, which is very famous on Twitter and
>> Amazon. Because you are also Chinese, there is a good translation here. If
>> I guess it is correct, the main translator is also from your company. This
>> part of what I mentioned is here.[1]
>>
>> [1]:
>> https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7
>>
>> Thanks, vino.
>>
>> Hequn Cheng  于2018年9月25日周二 下午9:45写道:
>>
>>> Hi vino,
>>>
>>> There are no order problems of stream-stream join in Flink. No matter
>>> what order the elements come, stream-stream join in Flink will output
>>> results which consistent with standard SQL semantics. I haven't read the
>>> book you mentioned. For join, it doesn't guarantee output orders. You have
>>> to do orderBy if you want to get ordered results.
>>>
>>> Best, Hequn
>>>
>>> On Tue, Sep 25, 2018 at 8:36 PM vino yang  wrote:
>>>
>>>> Hi Fabian,
>>>>
>>>> I may not have stated it here, and there is no semantic problem at the
>>>> Flink implementation level. Rather, there may be “Time-dependence” here. 
>>>> [1]
>>>>
>>>> Yes, my initial answer was not to use this form of join in this
>>>> scena

Re: Flink window triggering and timing on connected streams

2019-02-25 Thread Hequn Cheng
Hi Andrew,

>  I have an “end session” event that I want to cause the window to fire
and purge.
Do you want to fire the window only by the 'end session' event? I see one
option to solve the problem. You can use a tumbling window(say 5s) and set
your timestamp to t‘+5s each time receiving an 'end session' event in your
user-defined `AssignerWithPeriodicWatermarks`.

> My understanding is that this is what the trailing watermark is for, and
that in connected streams, the lowest (earliest) watermark of the input
streams is what is seen as the watermark downstream.
Yes, and we can make use of this to make window fires only on 'end session'
event using the solution above.

Best, Hequn


On Tue, Feb 26, 2019 at 5:54 AM Andrew Roberts  wrote:

> Hello,
>
> I’m trying to implement session windows over a set of connected streams
> (event time), with some custom triggering behavior. Essentially, I allow
> very long session gaps, but I have an “end session” event that I want to
> cause the window to fire and purge. I’m assigning timestamps and watermarks
> using BoundedOutOfOrdernessTimestampExtractor, with a 1 minute delay for
> the watermark. I have things mostly wired up, but I have some confusion
> about how I can ensure that my streams stay “in sync” relative to time.
>
>  Let’s say I am connecting streams A and B. Stream A is where the “end
> session” event always comes from. If I have a session involving events from
> time t to t’ in stream A, and then at t’ I get an “end session”, I want to
> ensure that the window doesn’t fire until stream B has also processed
> events (added events to the window) up to time t’. My understanding is that
> this is what the trailing watermark is for, and that in connected streams,
> the lowest (earliest) watermark of the input streams is what is seen as the
> watermark downstream.
>
> Currently, I’m setting a timer for the current time + 1 when I see my “end
> event”, with the idea that that timer will fire when the WATERMARK passes
> that time, i.e., all streams have progressed at least as far as that end
> event. However, the implementation of EventTimeTrigger doesn’t really look
> like that’s what’s going on.
>
> Can anyone clear up how these concepts interact? Is there a good model for
> this “session end event” concept that I can take a look at?
>
> Thanks,
>
> Andrew
> --
> *Confidentiality Notice: The information contained in this e-mail and any
>
> attachments may be confidential. If you are not an intended recipient, you
>
> are hereby notified that any dissemination, distribution or copying of this
>
> e-mail is strictly prohibited. If you have received this e-mail in error,
>
> please notify the sender and permanently delete the e-mail and any
>
> attachments immediately. You should not retain, copy or use this e-mail or
>
> any attachment for any purpose, nor disclose all or any part of the
>
> contents to any other person. Thank you.*
>


Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-21 Thread Hequn Cheng
Hi Stephan,

Thanks for summarizing the great roadmap! It is very helpful for users and
developers to track the direction of Flink.
+1 for putting the roadmap on the website and update it per release.

Besides, would be great if the roadmap can add the UpsertSource
feature(maybe put it under 'Batch Streaming Unification').
It has been discussed a long time ago[1,2] and is moving forward step by
step.
Currently, Flink can only emit upsert results. With the UpsertSource, we
can make our system a more complete one.

Best, Hequn

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-TABLE-How-to-handle-empty-delete-for-UpsertSource-td23856.html#a23874
[2] https://issues.apache.org/jira/browse/FLINK-8545




On Fri, Feb 22, 2019 at 3:31 AM Rong Rong  wrote:

> Hi Stephan,
>
> Yes. I completely agree. Jincheng & Jark gave some very valuable feedbacks
> and suggestions and I think we can definitely move the conversation forward
> to reach a more concrete doc first before we put in to the roadmap. Thanks
> for reviewing it and driving the roadmap effort!
>
> --
> Rong
>
> On Thu, Feb 21, 2019 at 8:50 AM Stephan Ewen  wrote:
>
>> Hi Rong Rong!
>>
>> I would add the security / kerberos threads to the roadmap. They seem to
>> be advanced enough in the discussions so that there is clarity what will
>> come.
>>
>> For the window operator with slicing, I would personally like to see the
>> discussion advance and have some more clarity and consensus on the feature
>> before adding it to the roadmap. Not having that in the first version of
>> the roadmap does not mean there will be no activity. And when the
>> discussion advances well in the next weeks, we can update the roadmap soon.
>>
>> What do you think?
>>
>> Best,
>> Stephan
>>
>>
>> On Thu, Feb 14, 2019 at 5:46 PM Rong Rong  wrote:
>>
>>> Hi Stephan,
>>>
>>> Thanks for the clarification, yes I think these issues has already been
>>> discussed in previous mailing list threads [1,2,3].
>>>
>>> I also agree that updating the "official" roadmap every release is a
>>> very good idea to avoid frequent update.
>>> One question I might've been a bit confusion is: are we suggesting to
>>> keep one roadmap on the documentation site (e.g. [4]) per release, or
>>> simply just one most up-to-date roadmap in the main website [5] ?
>>> Just like the release notes in every release, the former will probably
>>> provide a good tracker for users to look back at previous roadmaps as well
>>> I am assuming.
>>>
>>> Thanks,
>>> Rong
>>>
>>> [1]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
>>> [2]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
>>> [3]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html
>>>
>>> [4] https://ci.apache.org/projects/flink/flink-docs-release-1.7/
>>> [5] https://flink.apache.org/
>>>
>>> On Thu, Feb 14, 2019 at 2:26 AM Stephan Ewen  wrote:
>>>
 I think the website is better as well.

 I agree with Fabian that the wiki is not so visible, and visibility is
 the main motivation.
 This type of roadmap overview would not be updated by everyone -
 letting committers update the roadmap means the listed threads are actually
 happening at the moment.


 On Thu, Feb 14, 2019 at 11:14 AM Fabian Hueske 
 wrote:

> Hi,
>
> I like the idea of putting the roadmap on the website because it is
> much more visible (and IMO more credible, obligatory) there.
> However, I share the concerns about frequent updates.
>
> It think it would be great to update the "official" roadmap on the
> website once per release (-bugfix releases), i.e., every three month.
> We can use the wiki to collect and draft the roadmap for the next
> update.
>
> Best, Fabian
>
>
> Am Do., 14. Feb. 2019 um 11:03 Uhr schrieb Jeff Zhang <
> zjf...@gmail.com>:
>
>> Hi Stephan,
>>
>> Thanks for this proposal. It is a good idea to track the roadmap. One
>> suggestion is that it might be better to put it into wiki page first.
>> Because it is easier to update the roadmap on wiki compared to on flink 
>> web
>> site. And I guess we may need to update the roadmap very often at the
>> beginning as there's so many discussions and proposals in community
>> recently. We can move it into flink web site later when we feel it could 
>> be
>> nailed down.
>>
>> Stephan Ewen  于2019年2月14日周四 下午5:44写道:
>>
>>> Thanks Jincheng and Rong Rong!
>>>
>>> I am not deciding a roadmap and making a call on what features
>>> should be developed or not. I was only collecting broader issues that 
>>> are
>>> already happening or have an active FLIP/design dis

Re: [ANNOUNCE] Apache Flink 1.7.2 released

2019-02-17 Thread Hequn Cheng
Thanks a lot for the great release @Gordon.
Also thanks for the work by the whole community. :-)

Best, Hequn


On Mon, Feb 18, 2019 at 2:12 PM jincheng sun 
wrote:

> Thanks a lot for being our release manager Gordon ,
> Great job!
>  And also a big thanks to the community for making this release possible.
>
> Cheers,
> Jincheng
>
>
> Tzu-Li (Gordon) Tai  于2019年2月18日周一 上午10:29写道:
>
>> Hi,
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.7.2, which is the second bugfix release for the Apache
>> Flink 1.7 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data
>> streaming applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>> 
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2019/02/15/release-1.7.2.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344632
>>
>> We would like to thank all contributors of the Apache Flink community
>> who made this release possible!
>>
>> Regards,
>> Gordon
>>
>


Re: [ANNOUNCE] New Flink PMC member Thomas Weise

2019-02-12 Thread Hequn Cheng
Congrats Thomas!

Best, Hequn


On Tue, Feb 12, 2019 at 6:53 PM Stefan Richter 
wrote:

> Congrats Thomas!,
>
> Best,
> Stefan
>
> Am 12.02.2019 um 11:20 schrieb Stephen Connolly <
> stephen.alan.conno...@gmail.com>:
>
> Congratulations to Thomas. I see that this is not his first time in the
> PMC rodeo... also somebody needs to update LDAP as he's not on
> https://people.apache.org/phonebook.html?pmc=flink yet!
>
> -stephenc
>
> On Tue, 12 Feb 2019 at 09:59, Fabian Hueske  wrote:
>
>> Hi everyone,
>>
>> On behalf of the Flink PMC I am happy to announce Thomas Weise as a new
>> member of the Apache Flink PMC.
>>
>> Thomas is a long time contributor and member of our community.
>> He is starting and participating in lots of discussions on our mailing
>> lists, working on topics that are of joint interest of Flink and Beam, and
>> giving talks on Flink at many events.
>>
>> Please join me in welcoming and congratulating Thomas!
>>
>> Best,
>> Fabian
>>
>
>


Re: How to create schema for Flink table

2019-01-26 Thread Hequn Cheng
Hi Soheil,

DataSet can be converted to or from a Table. More details here:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#convert-a-datastream-or-dataset-into-a-table
Let me know if you have any questions.

Best, Hequn

On Sun, Jan 27, 2019 at 5:37 AM Soheil Pourbafrani 
wrote:

> I want to do some transformation on raw data and then create a table from
> that. Is it possible to create a schema for the table (in advance) and then
> using that transformed dataset and schema create the table?
>


Re: Query on retract stream

2019-01-26 Thread Hequn Cheng
Hi Gagan,

Besides the eventime and proctime difference, there is another difference
between the two ways. The window aggregate on bounded data, while unbounded
aggregate on unbounded data, i.e., the new coming data can update a very
old data.

As for the performance, I think the two ways may have no big difference in
current Flink version. Maybe you can run some tests between them on your
own scenarios if both of them can solve your problem. FYI: There is a nice
discussion[1] raised by Timo recently. Once Blink is merged into Flink, the
unbounded aggregate will be much faster than the window.

Best,
Hequn

[1] https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-32


On Sat, Jan 26, 2019 at 4:11 PM Gagan Agrawal 
wrote:

> Thanks Hequn for suggested solutions and I think this should really work
> and will give it a try. As I understand First solution  of using multiple
> windows will be good for those scenarios where I want output to be
> generated post window is materialized (i.e. watermark reaches end of
> window). And second will be good if I want it to be fired on per event
> basis (i.e no watermarking). Apart from this, do you see any difference
> from performance perspective in choosing between the two or both should be
> equally performant?
>
> Gagan
>
> On Sat, Jan 26, 2019 at 11:50 AM Hequn Cheng  wrote:
>
>> Hi Gagan,
>>
>> Time attribute fields will be materialized by the unbounded groupby.
>> Also, currently, the window doesn't have the ability to handle retraction
>> messages. I see two ways to solve the problem.
>>
>> - Use multi-window.  The first window performs lastValue, the second
>> performs count.
>> - Use two non-window aggregates. In this case, you don't have to change
>> anything for the first aggregate. For the second one, you can group by an
>> hour field and perform count(). The code looks like:
>>
>> SELECT userId,
>>  count(orderId)
>> FROM
>> (SELECT orderId,
>>  getHour(orderTime) as myHour,
>>  lastValue(userId) AS userId,
>>  lastValue(status) AS status
>> FROM orders
>> GROUP BY  orderId, orderTime)
>> WHERE status='PENDING'
>> GROUP BY myHour, userId
>>
>> Best,
>> Hequn
>>
>>
>>
>>
>> On Sat, Jan 26, 2019 at 12:29 PM Gagan Agrawal 
>> wrote:
>>
>>> Based on the suggestions in this mail thread, I tried out few
>>> experiments on upsert stream with flink 1.7.1 and here is the issue I am
>>> facing with window stream.
>>>
>>> *1. Global Pending order count. *
>>> Following query works fine and it's able to handle updates as per
>>> original requirement.
>>>
>>> select userId, count(orderId) from
>>> (select orderId, lastValue(userId) as userId, lastValue(status) as
>>> status from orders group by orderId)
>>> where status='PENDING' group by userId
>>>
>>> *2. Last 1 Hour tumbling window count (Append stream)*
>>> Though following query doesn't handle upsert stream, I just tried to
>>> make sure time column is working fine. This is working, but as expected, it
>>> doesn't handle updates on orderId.
>>>
>>> select userId, count(orderId) from orders
>>> where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR),
>>> userId
>>>
>>> 3. *Last 1 Hour tumbling window count (With upsert stream)*
>>> Now I tried combination of above two where input stream is converted to
>>> upsert stream (via lastValue aggregate function) and then Pending count
>>> needs to be calculated in last 1 hour window.
>>>
>>> select userId, count(orderId) from
>>> (select orderId, orderTime, lastValue(userId) as userId,
>>> lastValue(status) as status from orders group by orderId, orderTime)
>>> where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR),
>>> userId
>>>
>>> This one gives me following error. Is this because I have added
>>> orderTime in group by/select clause and hence it's time characteristics
>>> have changed? What is the workaround here as without adding orderTime, I
>>> can not perform window aggregation on upsert stream.
>>>
>>> [error] Exception in thread "main"
>>> org.apache.flink.table.api.ValidationException:* Window can only be
>>> defined over a time attribute column.*
>>> [error] at
>>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getO

Re: Print table contents

2019-01-25 Thread Hequn Cheng
Hi Soheil,

There is no print() or show() method in Table. As a workaround, you can
convert[1] the Table into a DataSet and perform print() or collect() on the
DataSet.
You have to pay attention to the differences between DataSet.print() and
DataSet.collect().
For DataSet.print(), prints the elements in a DataSet to the standard
output stream {@link System#out} of the JVM that calls the print() method.
For programs that are executed in a cluster, this method needs to gather
the contents of the DataSet back to the client, to print it there.
For DataSet.collect(), get the elements of a DataSet as a List. As DataSet
can contain a lot of data, this method should be used with caution.

Best,
Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#convert-a-table-into-a-dataset


On Sat, Jan 26, 2019 at 3:24 AM Soheil Pourbafrani 
wrote:

> Hi,
>
> Using Flink Table object how can we print table contents, something like
> Spark show() method?
>
> for example in the following:
>
> tableEnv.registerDataSet("Orders", raw, "id, country, num, about");
> Table results = tableEnv.sqlQuery("SELECT id FROM Orders WHERE id > 10");
>
> How can I print the results variable contents?
>
> Thanks
>


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-25 Thread Hequn Cheng
Hi Chesnay,

Thanks a lot for the proposal! +1 for a leaner flink-dist and improve the
"Download" page.
 I think a leaner flink-dist would be very helpful. If we bundle all jars
into a single one, this will easily cause class conflict problem.

Best,
Hequn


On Fri, Jan 25, 2019 at 2:48 PM jincheng sun 
wrote:

> Hi Chesnay,
>
> Thank you for the proposal. And i like it very much.
>
> +1 for the leaner distribution.
>
> About improve the "Download" page, I think we can add the connectors
> download link in the  "Optional components" section which @Timo Walther
>   mentioned above.
>
>
> Regards,
> Jincheng
>
> Chesnay Schepler  于2019年1月18日周五 下午5:59写道:
>
>> Hello,
>>
>> the binary distribution that we release by now contains quite a lot of
>> optional components, including various filesystems, metric reporters and
>> libraries. Most users will only use a fraction of these, and as such
>> pretty much only increase the size of flink-dist.
>>
>> With Flink growing more and more in scope I don't believe it to be
>> feasible to ship everything we have with every distribution, and instead
>> suggest more of a "pick-what-you-need" model, where flink-dist is rather
>> lean and additional components are downloaded separately and added by
>> the user.
>>
>> This would primarily affect the /opt directory, but could also be
>> extended to cover flink-dist. For example, the yarn and mesos code could
>> be spliced out into separate jars that could be added to lib manually.
>>
>> Let me know what you think.
>>
>> Regards,
>>
>> Chesnay
>>
>>


Re: Query on retract stream

2019-01-25 Thread Hequn Cheng
> Gagan
>
> On Tue, Jan 22, 2019 at 7:01 PM Gagan Agrawal 
> wrote:
>
>> Thanks Hequn for your response. I initially thought of trying out "over
>> window" clause, however as per documentation there seems to be limitation
>> in "orderBy" clause where it allows only single time event/processing time
>> attribute. Whereas in my case events are getting generated from mysql bin
>> log where I have seen multiple event updates getting generated with same
>> timestamp (may be because they are part of same transaction) and hence will
>> need bin log offset along with timestamp to be able to sort them correctly.
>> So looks like can't use "over window" until it allows multiple columns in
>> "orderBy". I am exploring option of creating my own window as you suggested
>> to be more flexible.
>>
>> Gagan
>>
>> On Tue, Jan 22, 2019 at 7:23 AM Hequn Cheng  wrote:
>>
>>> Hi Gagan,
>>>
>>> > But I also have a requirement for event time based sliding window
>>> aggregation
>>>
>>> Yes, you can achieve this with Flink TableAPI/SQL. However, currently,
>>> sliding windows don't support early fire, i.e., only output results when
>>> event time reaches the end of the window. Once window fires, the window
>>> state will be cleared and late data belonging to this window will be
>>> ignored. In order to wait for the late event, you can extract
>>> watermark with an offset from the timestamp. For example, make watermark =
>>> timestamp - 5min.
>>>
>>> If event time and early fire is a strong requirement in your scenarios,
>>> you can probably use an over window[1] to solve your problem, say an over
>>> window with 1h preceding. Over window outputs a result for each input.
>>>
>>> If the above solutions can't meet your requirements, you can write a
>>> DataStream job in which define your own window logic[2].
>>>
>>> Best, Hequn
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/tableApi.html#over-windows
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html
>>>
>>>
>>>
>>> On Tue, Jan 22, 2019 at 12:58 AM Gagan Agrawal 
>>> wrote:
>>>
>>>> Thank you guys. It's great to hear multiple solutions to achieve this.
>>>> I understand that records once emitted to Kafka can not be deleted and
>>>> that's acceptable for our use case as last updated value should always be
>>>> correct. However as I understand most of these solutions will work for
>>>> global aggregation which was asked in original question. But I also have
>>>> requirement for event time based sliding window aggregation where same
>>>> order count needs to be maintained for past x hours window (sliding at say
>>>> every 5 minutes). Is it possible to achieve with Table Api / SQL at the
>>>> moment or will require some custom implementation?
>>>>
>>>> For window based upsert stream, there can be few scenarios.
>>>>
>>>> 1. An update to record key comes in same window. E.g Pending (t1) ->
>>>> Success (t2) happens in same window w1. In this case once window
>>>> aggregation is triggered/emitted, such records will be counted as 0
>>>> 2. An update to record key belongs to same window but arrives late. In
>>>> this case old(and already emitted)  window (w1) needs to be re-emitted with
>>>> decreased value.
>>>> 3. An update to record key comes in different window. E.g Pending (t1)
>>>> in window w1 and Success (t2) in w2. I think in this case it may not
>>>> require to re-emit old window w1 as it represents pending count till that
>>>> window time (w1) which is still valid as record moved to Success in next
>>>> window w2 (based on event time).
>>>>
>>>> Gagan
>>>>
>>>>
>>>> On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski 
>>>> wrote:
>>>>
>>>>> @Jeff: It depends if user can define a time window for his condition.
>>>>> As Gagan described his problem it was about “global” threshold of pending
>>>>> orders.
>>>>>
>>>>>
>>>>>
>>>>> I have just thought about another solution that should work without
>>>>> any custom code. Converting “status” field to status_value int:
>&

Re: Is there a way to get all flink build-in SQL functions

2019-01-24 Thread Hequn Cheng
Hi yinhua,

As Chesnay suggest, document is a good way. You can find descriptions and
example for each udf.
If you only want to get a list of name, you can also take a look at the
flink code(i.e., the BasicOperatorTable.builtInSqlOperators

).

Hope this helps.
Best, Hequn

On Thu, Jan 24, 2019 at 4:34 PM Chesnay Schepler  wrote:

> Beyond the documentation
> 
> I don't believe there to be a mechanism for listing all built-in functions.
>
> On 23.01.2019 04:30, yinhua.dai wrote:
>
> I would like to put this list to the our self service flink SQL web UI.
> Thanks.
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>


Re: Query on retract stream

2019-01-21 Thread Hequn Cheng
Hi Gagan,

> But I also have a requirement for event time based sliding window
aggregation

Yes, you can achieve this with Flink TableAPI/SQL. However, currently,
sliding windows don't support early fire, i.e., only output results when
event time reaches the end of the window. Once window fires, the window
state will be cleared and late data belonging to this window will be
ignored. In order to wait for the late event, you can extract
watermark with an offset from the timestamp. For example, make watermark =
timestamp - 5min.

If event time and early fire is a strong requirement in your scenarios, you
can probably use an over window[1] to solve your problem, say an over
window with 1h preceding. Over window outputs a result for each input.

If the above solutions can't meet your requirements, you can write a
DataStream job in which define your own window logic[2].

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/tableApi.html#over-windows
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html



On Tue, Jan 22, 2019 at 12:58 AM Gagan Agrawal 
wrote:

> Thank you guys. It's great to hear multiple solutions to achieve this. I
> understand that records once emitted to Kafka can not be deleted and that's
> acceptable for our use case as last updated value should always be correct.
> However as I understand most of these solutions will work for global
> aggregation which was asked in original question. But I also have
> requirement for event time based sliding window aggregation where same
> order count needs to be maintained for past x hours window (sliding at say
> every 5 minutes). Is it possible to achieve with Table Api / SQL at the
> moment or will require some custom implementation?
>
> For window based upsert stream, there can be few scenarios.
>
> 1. An update to record key comes in same window. E.g Pending (t1) ->
> Success (t2) happens in same window w1. In this case once window
> aggregation is triggered/emitted, such records will be counted as 0
> 2. An update to record key belongs to same window but arrives late. In
> this case old(and already emitted)  window (w1) needs to be re-emitted with
> decreased value.
> 3. An update to record key comes in different window. E.g Pending (t1) in
> window w1 and Success (t2) in w2. I think in this case it may not require
> to re-emit old window w1 as it represents pending count till that window
> time (w1) which is still valid as record moved to Success in next window w2
> (based on event time).
>
> Gagan
>
>
> On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski 
> wrote:
>
>> @Jeff: It depends if user can define a time window for his condition.
>> As Gagan described his problem it was about “global” threshold of pending
>> orders.
>>
>>
>>
>> I have just thought about another solution that should work without any
>> custom code. Converting “status” field to status_value int:
>> - "+1” for pending
>> - “-1” for success/failure
>> - “0” otherwise
>>
>> Then running:
>>
>> SELECT uid, SUM(status_value) FROM … GROUP BY uid;
>>
>> Query on top of such stream. Conversion to integers could be made by
>> using `CASE` expression.
>>
>> One thing to note here is that probably all of the proposed solutions
>> would work based on the order of the records, not based on the event_time.
>>
>> Piotrek
>>
>> On 21 Jan 2019, at 15:10, Jeff Zhang  wrote:
>>
>> I am thinking of another approach instead of retract stream. Is it
>> possible to define a custom window to do this ? This window is defined for
>> each order. And then you just need to analyze the events in this window.
>>
>> Piotr Nowojski  于2019年1月21日周一 下午8:44写道:
>>
>>> Hi,
>>>
>>> There is a missing feature in Flink Table API/SQL of supporting
>>> retraction streams as the input (or conversions from append stream to
>>> retraction stream) at the moment. With that your problem would simplify to
>>> one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an
>>> ongoing work with related work [1], so this might be supported in the next
>>> couple of months.
>>>
>>> There might a workaround at the moment that could work. I think you
>>> would need to write your own custom `LAST_ROW(x)` aggregation function,
>>> which would just return the value of the most recent aggregated row. With
>>> that you could write a query like this:
>>>
>>> SELECT
>>> uid, count(*)
>>> FROM (
>>> SELECT
>>> *
>>> FROM (
>>> SELECT
>>> uid, LAST_ROW(status)
>>> FROM
>>> changelog
>>> GROUP BY
>>> uid, oid)
>>> WHERE status = `pending`)
>>> GROUP BY
>>> uid
>>>
>>> Where `changelog` is an append only stream with the following content:
>>>
>>> *user, order, status, event_time*
>>> u1, o1, pending, t1
>>> u2, o2, failed, t2
>>> *u1, o3, pending, t3*
>>> *u1, o3, success, t4*
>>> u2, o4, pending, t5
>>> u2, o4, pending, t6
>>>
>>>
>>>
>>> Besides that, you could also write your own a relatively simple Data
>>> Stream application to do the same thing.
>>>
>>> I’

Re: Query on retract stream

2019-01-21 Thread Hequn Cheng
Hi Gagan,

Yes, you can achieve this with Flink TableAPI/SQL. However, you have to pay
attention to the following things:
1) Currently, Flink only ingests append streams. In order to ingest upsert
streams(steam with keys), you can use groupBy with a user-defined
LAST_VALUE aggregate function. For implementation, you can refer to the MAX
AggregateFunction(MAX always return the max value while LAST_VALUE always
return the latest value). The SQL may look like:

SELECT user, COUNT(*)
> FROM (
> SELECT order, LAST_VALUE(user), LAST_VALUE(status), LAST_VALUE(event_time)
> FROM SourceTable
> GROUP BY order
> )
> WHERE status = 'pending'
> GROUP BY user

You have to note that the query will be processed under processing time
instead of event time. But I think it would be fine for you, as the final
result will be right.

As for the upsert source, there is already a pr[1] on it, and it is under
review now.

2) You have to note that once you output results to Kafka according to a
configured threshold. The output record cannot be deleted anymore even the
count value decreased. Because Kafka doesn't support delete messages. Also,
this issue[2] make things worse. You can take a detailed look if you
interested in it.

Best, Hequn

[1] https://github.com/apache/flink/pull/6787
[2] https://issues.apache.org/jira/browse/FLINK-9528


On Sat, Jan 19, 2019 at 1:31 AM Gagan Agrawal 
wrote:

> Hi,
> I have a requirement and need to understand if same can be achieved with
> Flink retract stream. Let's say we have stream with 4 attributes userId,
> orderId, status, event_time where orderId is unique and hence any change in
> same orderId updates previous value as below
>
> *Changelog* *Event Stream*
>
> *user, order, status, event_time*
> u1, o1, pending, t1
> u2, o2, failed, t2
> *u1, o3, pending, t3*
> *u1, o3, success, t4*
> u2, o4, pending, t5
> u2, o4, pending, t6
>
> *Snapshot view at time t6 (as viewed in mysql)*
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, success, t4
> u4, o4, pending, t6
> (Here rows at time t3 and t5 are deleted as they have been updated for
> respective order ids)
>
> What I need is to maintain count of "Pending" orders against a user and if
> they go beyond configured threshold, then push that user and pending count
> to Kafka. Here there can be multiple updates to order status e.g Pending ->
> Success or Pending -> Failed. Also in some cases there may not be any
> change in status but we may still get a row (may be due to some other
> attribute update which we are not concerned about). So is it possible to
> have running count in flink as below at respective event times. Here
> Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's
> order status was changed from Pending to Success. Similarly for user u2, at
> time t6, there was no change in running count as there was no change in
> status for order o4
>
> t1 -> u1 : 1, u2 : 0
> t2 -> u1 : 1, u2 : 0
> t3 -> u1 : 2, u2 : 0
> *t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is
> decreased for u1)*
> t5 -> u1 : 1, u2 : 1
> *t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no
> change)*
>
> As I understand may be retract stream can achieve this. However I am not
> sure how. Any samples around this would be of great help.
>
> Gagan
>
>


Re: There is no classloader in TableFactoryUtil using ConnectTableDescriptor to registerTableSource

2019-01-15 Thread Hequn Cheng
Hi Joshua,

Could you use `TableFactoryService` directly to register TableSource? The
code looks like:

final TableSource tableSource =
> TableFactoryService.find(StreamTableSourceFactory.class,
> streamTableDescriptor, classloader)
> .createStreamTableSource(propertiesMap);
> tableEnv.registerTableSource(name, tableSource);


Best, Hequn

On Tue, Jan 15, 2019 at 6:43 PM Joshua Fan  wrote:

> Hi
>
> As known, TableFactoryService has many methods to find a suitable service
> to load. Some of them use a user defined classloader, the others just uses
> the default classloader.
>
> Now I use ConnectTableDescriptor to registerTableSource in the
> environment, which uses TableFactoryUtil to load service, but
> TableFactoryUtil just use the default classloader, it is not enough in my
> case. Because the user may use kafka 0.8 or 0.9, the jars can not be put
> together in the lib directory.
>
> Is there a proper way to use ConnectTableDescriptor to registerTableSource
> at a user defined classloader?
>
> I know SQL Client has their now implementation to avoid
> use TableFactoryUtil, but I think TableFactoryUtil itself should also
> provide a method to use user defined classloader.
>
> Yours sincerely
> Joshhua
>


Re: Multiple select single result

2019-01-13 Thread Hequn Cheng
Hi dhanuka,

> I am trying to deploy 200 SQL unions and it seems all the tasks getting
failing after some time.
Would be great if you can show us some information(say exception stack)
about the failure. Is it caused by OOM of job manager?

> How do i allocate memory for task manager and job manager. What are the
factors need to be considered .
According to your SQL, I guess you need more memory for the job manager[1]
since you unionAll 200 tables, the job graph should be a bit big. As for
the taskmanger, I think it may be ok to use the default memory setting
unless you allocate a lot of memory in your UDFs or you just want to make
better use of the memory(we can discuss more if you like).

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager

On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <
dhanuka.priyan...@gmail.com> wrote:

> Hi Fabian,
>
> Thanks for the prompt reply and its working 🤗.
>
> I am trying to deploy 200 SQL unions and it seems all the tasks getting
> failing after some time.
>
> How do i allocate memory for task manager and job manager. What are the
> factors need to be considered .
>
> Cheers
> Dhanuka
>
> On Sun, 13 Jan 2019, 22:05 Fabian Hueske 
> > Hi Dhanuka,
> >
> > The important error message here is "AppendStreamTableSink requires that
> > Table has only insert changes".
> > This is because you use UNION instead of UNION ALL, which implies
> > duplicate elimination.
> > Unfortunately, UNION is currently internally implemented as a regular
> > aggregration which produces a retraction stream (although, this would not
> > be necessary).
> >
> > If you don't require duplicate elimination, you can replace UNION by
> UNION
> > ALL and the query should work.
> > If you require duplicate elimination, it is currently not possible to use
> > SQL for your use case.
> >
> > There is thea Jira issue FLINK-9422 to improve this case [1].
> >
> > Best, Fabian
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-9422
> >
> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
> > dhanuka.priyan...@gmail.com>:
> >
> >> Hi All,
> >>
> >> I am trying to select multiple results from Kafka and send results to
> >> Kafka
> >> different topic using Table API. But I am getting below error. Could you
> >> please help me on this.
> >>
> >> Query:
> >>
> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
> >> 4508724
> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
> >>  UNION
> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
> >> 4508724
> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
> >>  UNION
> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
> >> 4508724
> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
> >>
> >>
> >> *Error:*
> >>
> >> 2019-01-13 21:36:36,228 ERROR
> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler-
> Exception
> >> occurred in REST handler.
> >> org.apache.flink.runtime.rest.handler.RestHandlerException:
> >> org.apache.flink.client.program.ProgramInvocationException: The main
> >> method
> >> caused an error.
> >> at
> >>
> >>
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
> >> at
> >>
> >>
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> >> at
> >>
> >>
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> >> at
> >>
> >>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >> at
> >>
> >>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
> >> at
> >>
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> at
> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> at java.lang.Thread.run(Thread.java:748)
> >> Caused by: java.util.concurrent.CompletionException:
> >> org.apache.flink.client.program.ProgramInvocationException: The main
> >> method
> >> caused an error.
> >> at
> >>
> >>
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
> >> at
> >>
> >>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> >> ... 3 more
> >> Caused by: org.apache.flink.client.program.ProgramInvocationException:
> The
> >> main method caused an error.
> >> at
> >>
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> >> at
> >>
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> >> at
> >>
> >>
> org.apache.flink.client.program.OptimizerPlanEnvironment.getO

Re: breaking the pipe line

2019-01-12 Thread Hequn Cheng
Hi Alieh,

Which kind of API do you use? TableApi or SQL or DataStream or DataSet.
Would be great if you can show us some information about your pipeline or
provide a way to reproduce the problem.

Best, Hequn

On Sat, Jan 12, 2019 at 1:58 AM Alieh 
wrote:

> Hello all,
>
> I have a very very long pipeline (implementation of an incremental
> algorithm). It takes a very long time for Flink execution planner to
> create the plan. So I splitted the pipeline into several independent
> pipelines by writing down the intermediate results and again read them.
> Is there any other more efficient way to do it?
>
> Best,
>
> Alieh
>
>


Re: Is the eval method invoked in a thread safe manner in Fink UDF

2019-01-12 Thread Hequn Cheng
Hi Anil,

It is thread-safe.
Each udf instance will only run in one task. And for each udf, it processes
data synchronously, i.e, the next record will not be processed until the
current record is processed.

Best, Hequn

On Sat, Jan 12, 2019 at 3:12 AM Anil  wrote:

> Is the eval method invoked in a thread safe manner in Fink UDF.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: onTimer function is not getting executed and job is marked as finished.

2019-01-08 Thread Hequn Cheng
Hi Puneet,

Can you explain it in more detail? Do you mean the job is finished before
you call ctx.timeservice()?
Maybe you have to let your source running for a longer time.

It's better to show us the whole pipeline of your job. For example, write
a sample code(or provide a git link) that can reproduce your problem easily.

Best, Hequn


On Tue, Jan 8, 2019 at 11:44 AM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Hi hequan
>
> Weird behaviour when i m calling ctx.timeservice() function is getting
> exited even not throwing error
>
> On Tuesday, January 8, 2019, Hequn Cheng  wrote:
>
>> Hi puneet,
>>
>> Could you print `parseLong + 5000` and
>> `ctx.timerService().currentProcessingTime()` out and check the value?
>> I know it is a streaming program. What I mean is the timer you have
>> registered is not within the interval of your job, so the timer has not
>> been triggered. For example, parseLong + 5000 = 5000 or parseLong + 5000 =
>> 1000(very big).
>>
>> Best, Hequn
>>
>>
>> On Tue, Jan 8, 2019 at 1:38 AM Puneet Kinra <
>> puneet.ki...@customercentria.com> wrote:
>>
>>> I checked the same the function is getting exited when i am calling
>>> ctx.getTimeservice () function.
>>>
>>> On Mon, Jan 7, 2019 at 10:27 PM Timo Walther  wrote:
>>>
>>>> Hi Puneet,
>>>>
>>>> maybe you can show or explain us a bit more about your pipeline. From
>>>> what I see your ProcessFunction looks correct. Are you sure the registering
>>>> takes place?
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> Am 07.01.19 um 14:15 schrieb Puneet Kinra:
>>>>
>>>> Hi Hequn
>>>>
>>>> Its a streaming job .
>>>>
>>>> On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng 
>>>> wrote:
>>>>
>>>>> Hi Puneet,
>>>>>
>>>>> The value of the registered timer should within startTime and endTime
>>>>> of your job. For example, job starts at processing time t1 and stops at
>>>>> processing time t2. You have to make sure t1< `parseLong + 5000` < t2.
>>>>>
>>>>> Best, Hequn
>>>>>
>>>>> On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <
>>>>> puneet.ki...@customercentria.com> wrote:
>>>>>
>>>>>> Hi All
>>>>>>
>>>>>> Facing some issue with context to onTimer method in processfunction
>>>>>>
>>>>>> class TimerTest extends ProcessFunction,String>{
>>>>>>
>>>>>> /**
>>>>>> *
>>>>>> */
>>>>>> private static final long serialVersionUID = 1L;
>>>>>>
>>>>>> @Override
>>>>>> public void processElement(Tuple2 arg0,
>>>>>> ProcessFunction, String>.Context ctx,
>>>>>> Collector arg2) throws Exception {
>>>>>> // TODO Auto-generated method stub
>>>>>> long parseLong = Long.parseLong(arg0.f1);
>>>>>> TimerService timerService = ctx.timerService();
>>>>>> ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public void onTimer(long timestamp, ProcessFunction>>>>> String>, String>.OnTimerContext ctx,
>>>>>> Collector out) throws Exception {
>>>>>> // TODO Auto-generated method stub
>>>>>> super.onTimer(timestamp, ctx, out);
>>>>>> System.out.println("Executing timmer"+timestamp);
>>>>>> out.collect("Timer Testing..");
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> --
>>>>>> *Cheers *
>>>>>>
>>>>>> *Puneet Kinra*
>>>>>>
>>>>>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>>>>>> *
>>>>>>
>>>>>> *e-mail :puneet.ki...@customercentria.com
>>>>>> *
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>> --
>>>> *Cheers *
>>>>
>>>> *Puneet Kinra*
>>>>
>>>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>>>> *
>>>>
>>>> *e-mail :puneet.ki...@customercentria.com
>>>> *
>>>>
>>>>
>>>>
>>>>
>>>
>>> --
>>> *Cheers *
>>>
>>> *Puneet Kinra*
>>>
>>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>>> *
>>>
>>> *e-mail :puneet.ki...@customercentria.com
>>> *
>>>
>>>
>>>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>
>


Re: How to get the temp result of each dynamic table when executing Flink-SQL?

2019-01-08 Thread Hequn Cheng
Hi,

A print user-defined table sink is helpful. I think a print user-defined
UDF is another workaround.
Hope this helps.

Best, Hequn

On Tue, Jan 8, 2019 at 1:45 PM yinhua.dai  wrote:

> In our case, we wrote a console table sink which print everything on the
> console, and use "insert into" to write the interim result to console.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: onTimer function is not getting executed and job is marked as finished.

2019-01-07 Thread Hequn Cheng
Hi puneet,

Could you print `parseLong + 5000` and
`ctx.timerService().currentProcessingTime()` out and check the value?
I know it is a streaming program. What I mean is the timer you have
registered is not within the interval of your job, so the timer has not
been triggered. For example, parseLong + 5000 = 5000 or parseLong + 5000 =
1000(very big).

Best, Hequn


On Tue, Jan 8, 2019 at 1:38 AM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> I checked the same the function is getting exited when i am calling
> ctx.getTimeservice () function.
>
> On Mon, Jan 7, 2019 at 10:27 PM Timo Walther  wrote:
>
>> Hi Puneet,
>>
>> maybe you can show or explain us a bit more about your pipeline. From
>> what I see your ProcessFunction looks correct. Are you sure the registering
>> takes place?
>>
>> Regards,
>> Timo
>>
>> Am 07.01.19 um 14:15 schrieb Puneet Kinra:
>>
>> Hi Hequn
>>
>> Its a streaming job .
>>
>> On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng  wrote:
>>
>>> Hi Puneet,
>>>
>>> The value of the registered timer should within startTime and endTime of
>>> your job. For example, job starts at processing time t1 and stops at
>>> processing time t2. You have to make sure t1< `parseLong + 5000` < t2.
>>>
>>> Best, Hequn
>>>
>>> On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <
>>> puneet.ki...@customercentria.com> wrote:
>>>
>>>> Hi All
>>>>
>>>> Facing some issue with context to onTimer method in processfunction
>>>>
>>>> class TimerTest extends ProcessFunction,String>{
>>>>
>>>> /**
>>>> *
>>>> */
>>>> private static final long serialVersionUID = 1L;
>>>>
>>>> @Override
>>>> public void processElement(Tuple2 arg0,
>>>> ProcessFunction, String>.Context ctx,
>>>> Collector arg2) throws Exception {
>>>> // TODO Auto-generated method stub
>>>> long parseLong = Long.parseLong(arg0.f1);
>>>> TimerService timerService = ctx.timerService();
>>>> ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
>>>> }
>>>>
>>>> @Override
>>>> public void onTimer(long timestamp, ProcessFunction>>> String>, String>.OnTimerContext ctx,
>>>> Collector out) throws Exception {
>>>> // TODO Auto-generated method stub
>>>> super.onTimer(timestamp, ctx, out);
>>>> System.out.println("Executing timmer"+timestamp);
>>>> out.collect("Timer Testing..");
>>>> }
>>>> }
>>>>
>>>> --
>>>> *Cheers *
>>>>
>>>> *Puneet Kinra*
>>>>
>>>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>>>> *
>>>>
>>>> *e-mail :puneet.ki...@customercentria.com
>>>> *
>>>>
>>>>
>>>>
>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>> *
>>
>> *e-mail :puneet.ki...@customercentria.com
>> *
>>
>>
>>
>>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>


Re: onTimer function is not getting executed and job is marked as finished.

2019-01-07 Thread Hequn Cheng
Hi Puneet,

The value of the registered timer should within startTime and endTime of
your job. For example, job starts at processing time t1 and stops at
processing time t2. You have to make sure t1< `parseLong + 5000` < t2.

Best, Hequn

On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Hi All
>
> Facing some issue with context to onTimer method in processfunction
>
> class TimerTest extends ProcessFunction,String>{
>
> /**
> *
> */
> private static final long serialVersionUID = 1L;
>
> @Override
> public void processElement(Tuple2 arg0,
> ProcessFunction, String>.Context ctx,
> Collector arg2) throws Exception {
> // TODO Auto-generated method stub
> long parseLong = Long.parseLong(arg0.f1);
> TimerService timerService = ctx.timerService();
> ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
> }
>
> @Override
> public void onTimer(long timestamp, ProcessFunction String>, String>.OnTimerContext ctx,
> Collector out) throws Exception {
> // TODO Auto-generated method stub
> super.onTimer(timestamp, ctx, out);
> System.out.println("Executing timmer"+timestamp);
> out.collect("Timer Testing..");
> }
> }
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>


Re: Flink SQL client always cost 2 minutes to submit job to a local cluster

2019-01-03 Thread Hequn Cheng
Hi yinhua,

Could you help to reproduce the problem? I can help to figure out the root
cause.

Best, Hequn


On Fri, Jan 4, 2019 at 11:37 AM yinhua.dai  wrote:

> Hi Fabian,
>
> It's the submission of the jar file cost too long time.
> And yes Hequn and your suggestion is working, but just curious why a 100M
> jar files causes so long time to submit, is it related with some upload
> parameter settings of the web layer?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink SQL client always cost 2 minutes to submit job to a local cluster

2018-12-28 Thread Hequn Cheng
Hi, yinhua

Thanks for looking into the problem. I'm not familiar with the code of
these part. As a workaround, you can put your jars into the flink lib
folder or add your jars into the classpath. Hope this helps.

Best, Hequn


On Fri, Dec 28, 2018 at 11:52 AM yinhua.dai  wrote:

> I am using Flink 1.6.1, I tried to use flink sql client with some own jars
> with --jar and --library.
> It can work to execute sql query, however it always cause around 2 minutes
> to submit the job the local cluster, but when I copy my jar to flink lib,
> and remove --jar and --library parameter, it can submit the job
> immediately.
>
> I debugged and found the 2 minutes is cost by the RestClusterClient to send
> the request with the jar as payload to the flink cluster.
>
> I don't know why it uses 2 minutes to upload the package? Is there a way to
> work around it?
> Thanks.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Timestamp conversion problem in Flink Table/SQL

2018-12-28 Thread Hequn Cheng
Hi Jiayichao,

The two methods do not have to appear in pairs, so you can't use
timestamp.getTime() directly.
Currently, Flink doesn't support time zone configuration. The
timestamp(time of type Timestamp) always means the time in UTC+0. So in the
test of your pr[1], the output timestamp means a time in UTC+0, instead of
a time in your timezone. You can verify it by changing your sql to:
String sqlQuery = "select proctime, LOCALTIMESTAMP from MyTable";

But you raised a good question and it is true that it would be better to
support time zone configuration in Flink. For example, provide a global
timezone configuration. However, it is not a one or two lines code change.
We need to take all operators into consideration. And it is better to solve
it once for all.

Best, Hequn

[1] https://github.com/apache/flink/pull/7180


On Fri, Dec 28, 2018 at 3:15 PM jia yichao  wrote:

> Hi community,
>
>
> Recently I have encountered a problem with time conversion in Flink
> Table/SQL . When the processed field contains a timestamp type, the code of
> the flink table codegen first converts the timestamp type to a long type,
> and then converts the long type to a timestamp type on output.
> In the code generated by codegen,
>  “public static long toLong (Timestamp v)” and
> “public static java.sql.Timestamp internalToTimestamp (long v)”
>  are used in the conversion.
> The internal implementation of these two methods will add or subtract the
> time zone offset.
> In some cases, the two methods do not appear in pairs which causes the
> conversion time to be incorrect, resulting in watermark timestamp metrics
> on the web ui is equal to the correct value plus time zone offset, and the
> output of the process time field is equal to the correct value minus the
> time zone offset.
>
> Why the time conversion method in calcite (SqlFunctions.java)  add or
> subtract time zones?Why flink Table/SQL uses these time conversion methods
> instead of using timestamp.getTime() .
>
>
> calcite SqlFunctions.java==
> /** Converts the Java type used for UDF parameters of SQL TIMESTAMP type
>  * ({@link java.sql.Timestamp}) to internal representation (long).
>  *
>  * Converse of {@link #internalToTimestamp(long)}. */
> public static long toLong(Timestamp v) {
>   return toLong(v, LOCAL_TZ);
> }
>
> // mainly intended for java.sql.Timestamp but works for other dates also
> public static long toLong(java.util.Date v, TimeZone timeZone) {
>   final long time = v.getTime();
>   return time + timeZone.getOffset(time);
> }
>
> /** Converts the internal representation of a SQL TIMESTAMP (long) to the
> Java
>  * type used for UDF parameters ({@link java.sql.Timestamp}). */
> public static java.sql.Timestamp internalToTimestamp(long v) {
>   return new java.sql.Timestamp(v - LOCAL_TZ.getOffset(v));
> }
>
>
> Related discussion:
> http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/%3c351fd9ab-7a28-4ce0-bd9c-c2a15e537...@163.com%3E
>
> Related issue:https://github.com/apache/flink/pull/7180
>
>
>
> thanks
> Jiayichao
>


Re: Flink 1.7 doesn't work with Kafka Table Source Descriptors

2018-12-23 Thread Hequn Cheng
Hi Dhanuka,

>From the exceptions, it seems you have changed the Kafka version to
'universal'. You can solve your problem in any of the following ways:
- Change Kafka version to 0.11. You only have a jar of 0.11 version in your
lib folder.
- Add flink-connector-kafka_2.11-1.7.0.jar to your lib folder if you want
to use 'universal'.

Best, Hequn

On Sun, Dec 23, 2018 at 8:48 PM dhanuka ranasinghe <
dhanuka.priyan...@gmail.com> wrote:

> Hi Cheng,
>
> I have removed 1.6.1 jars and then I got below error
>
> Starting execution of program
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.StreamTableSourceFactory' in
> the classpath.
>
> Reason: No context matches.
>
> The following properties are requested:
> connector.properties.0.key=zookeeper.connect
> connector.properties.0.value=localhost:2181
> connector.properties.1.key=group.id
> connector.properties.1.value=analytics
> connector.properties.2.key=bootstrap.servers
> connector.properties.2.value=localhost:9092
> connector.property-version=1
> connector.startup-mode=latest-offset
> connector.topic=testin
> connector.type=kafka
> connector.version=universal
> format.fail-on-missing-field=false
> format.json-schema={\n  \"type\": \"object\",\n  \"properties\": {\n
> \"food\": {\n  \"type\": \"string\"\n},\n\"price\": {\n
> \"type\": \"integer\"\n},\n\"processingTime\": {\n  \"type\":
> \"integer\"\n}\n  }\n}
> format.property-version=1
> format.type=json
> schema.0.name=food
> schema.0.type=VARCHAR
> schema.1.name=price
> schema.1.type=DECIMAL
> schema.2.name=processingTime
> schema.2.proctime=true
> schema.2.type=TIMESTAMP
> update-mode=append
>
> The following factories have been considered:
> org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
> org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
> org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
> org.apache.flink.formats.json.JsonRowFormatFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>
> at
> org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
> at
> org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
> at
> org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
> at
> org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
> at
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
> at
> org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:97)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.jav

Re: Flink 1.7 doesn't work with Kafka Table Source Descriptors

2018-12-22 Thread Hequn Cheng
Hi dhanuka,

I failed to reproduce your error with release-1.7.0. It
seems Kafka.toConnectorProperties() should be called instead
of ConnectorDescriptor.toConnectorProperties(), the latter one is an
abstract class, which lead to the AbstractMethodError.

>From the picture uploaded, it is strange that the jar of 1.6.1 is mixed
with the jar of 1.7.0. It may result in class conflict problem.
Furthermore, set flink dependency scope to provided, so that classes of
flink will not be packaged into the user jar. It will also cause class
conflict problem.

Best,
Hequn


On Fri, Dec 21, 2018 at 6:24 PM dhanuka ranasinghe <
dhanuka.priyan...@gmail.com> wrote:

> Add Dev Group
>
> On Fri, Dec 21, 2018 at 6:21 PM dhanuka ranasinghe <
> dhanuka.priyan...@gmail.com> wrote:
>
>> Hi All,
>>
>> I have tried to read data from Kafka from Flink using Table API. It's
>> working fine with Flink 1.4 but when upgrade to 1.7 given me below error. I
>> have attached the libraries added to Flink.
>>
>> Could you please help me on this.
>>
>> bin/flink run stream-analytics-0.0.1-SNAPSHOT.jar --read-topic testin
>> --write-topic testout --bootstrap.servers localhost --group.id analytics
>> Starting execution of program
>> java.lang.AbstractMethodError:
>> org.apache.flink.table.descriptors.ConnectorDescriptor.toConnectorProperties()Ljava/util/Map;
>> at
>> org.apache.flink.table.descriptors.ConnectorDescriptor.toProperties(ConnectorDescriptor.java:58)
>> at
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.toProperties(ConnectTableDescriptor.scala:107)
>> at
>> org.apache.flink.table.descriptors.StreamTableDescriptor.toProperties(StreamTableDescriptor.scala:95)
>> at
>> org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:39)
>> at
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
>> at
>> org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:82)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>> at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>
>> Cheers,
>> Dhanuka
>>
>> --
>> Nothing Impossible,Creativity is more important than knowledge.
>>
>
>
> --
> Nothing Impossible,Creativity is more important than knowledge.
>


Re: Watermark not firing to push data

2018-12-15 Thread Hequn Cheng
Hi Vijay,

Could you provide more information about your problem? For example
- Which kind of window do you use?
- What's the window size?
- A relatively complete code is better :-)

As for the problem, it is probably the event time has not reached the end
of the window. You can monitor the watermark in the web dashboard[1].
Also, changing even time to processing time is another way to verify if it
is a watermark problem.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html


On Sat, Dec 15, 2018 at 12:59 AM Vijay Balakrishnan 
wrote:

> Hi,
> Observations on Watermarks:
> Read this great article:
> https://data-artisans.com/blog/watermarks-in-apache-flink-made-easy
>
> * Watermark means when for any event TS, when to stop waiting for arrival
> of earlier events.
> * Watermark t means all events with Timestamp < t have already arrived.
> * When to push data out - When watermark with TS >= t arrives
>
> Only *using incrementing current time for watermark seems to be working
> correctly* but not sure if it aligns up correctly with EventTime
> processing.
> *Using the incoming records intervalStart as the Watermark source  for
> EventTime causes data to not be pushed at all* in cases when i have just
> 5 records in the Source.
>
> My source generation for intervalStart has intervalStart incrementing at a
> regular interval.
> I tried using the intervalStart for my Watermark with a out of order late
> boundedness of 3 secs.
> The *AggregateFunction* I am using calls the add() fine but *never calls
> the getResult().*
> My assumption was that the AggregateFunction I am using would push the
> data to getResult
> based on the Watermark based on intervalStart incrementing beyong the
> previous watermark t.
> But it doesn't -is it because I have limited number of input records and
> once intervalStart gets to the end
> of the input records too fast, it stops incrementing the watermar and
> hence doesn't push data ?
>
> With System.currentTimeMillis, it happily keeps increasing and hence
> pushes the data.
>
> Created this class:
> public class MonitoringAssigner implements
> AssignerWithPunctuatedWatermarks {
> private long bound = 3 * 1000;//3 secs out of order bound in millisecs
>
> public MonitoringAssigner(long bound) {
> this.bound = bound;
> }
> public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
> extractedTimestamp) {
> long nextWatermark = extractedTimestamp - bound;
> //simply emit a Watermark with every event
> return new Watermark(nextWatermark);
> }
>
> @Override
> public long extractTimestamp(Monitoring monitoring, long previousTS) {
> /*LocalDateTime intervalStart =
> Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12
> 02:21:06.057
> long extractedTS = 
> Utils.getLongFromLocalDateTime(intervalStart);//*using
> this stopped pushing recs after a certain time*
> return extractedTS;*/
> return *System.currentTimeMillis*();//incrementing current time
>
> }
>
>


Re: Question about key group / key state & parallelism

2018-12-12 Thread Hequn Cheng
Hi Bastien,

You are right, it will wait for message A to be processed. To be more
generic, it is a question of how to solve the data skew problem in shuffle
case. This question is common and there are already many ways to solve it
according to the different scenario.
I think we can solve your problem in the following ways:
- Define your own hash logic according to your business logic. For example,
making A and B contains a different hash value.
- Increase the maximum parallelism. There are exactly as many Key Groups as
the defined maximum parallelism. The more parallelism, the more key groups.
This reduces the probability that A&B in the same key group, i.e, reduce
the probability that in the same instance.

Best, Hequn



On Wed, Dec 12, 2018 at 10:33 PM bastien dine 
wrote:

> Hi Hequn, thanks for your response !
>
> Ok, that's what I was thinking about the key & operator instance
> If the affectation of key group to an instance is deterministic (and the
> hash of the key to belong to a key group) I have the following problem
>
> Let's say I have 4 key (A,B,C,D) & 2 parallel instance for my operator (1,
> 2).
> Flink determines that A/B belong 1 and C/D belong to 2.
> If I have a message keyed by A it will be processed by 1.
> But the following message is a B-key, it will wait for message A to be
> processed by 1 and then go to 1, even if 2 is not busy and can technically
> do the processing, right ?
>
> How can I deal with that ?
>
> Best Regard and many thanks !
> Bastien
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>
>
> Le mer. 12 déc. 2018 à 13:39, Hequn Cheng  a écrit :
>
>> Hi Bastien,
>>
>> Each key “belongs” to exactly one parallel instance of a keyed operator,
>> and each parallel instance contains one or more Key Groups.
>> Keys will be hashed into the corresponding key group deterministically.
>> It is hashed by the value instead of the number of the total records.
>> Different keys do not affect each other even a parallel instance contains
>> one or more Key Groups.
>>
>> Best, Hequn
>>
>>
>> On Wed, Dec 12, 2018 at 6:21 PM bastien dine 
>> wrote:
>>
>>> Hello everyone,
>>>
>>> I have a question regarding the key state & parallelism of a process
>>> operation
>>>
>>> Doc says : "You can think of Keyed State as Operator State that has been
>>> partitioned, or sharded, with exactly one state-partition per key. Each
>>> keyed-state is logically bound to a unique composite of
>>> , and since each key “belongs” to exactly
>>> one parallel instance of a keyed operator, we can think of this simply as
>>> ."
>>>
>>> If I have less parallel operator instance (say 5) than my number of
>>> possible key (10), it means than every instance will "manage" 2 key state ?
>>> (is this spread evenly ?)
>>> Is the logical bound fixed ? I mean, are the state always managed by the
>>> same instance, or does this depends on the available instance at the moment
>>> ?
>>>
>>> "During execution each parallel instance of a keyed operator works with
>>> the keys for one or more Key Groups."
>>> -> this is related, does "works with the keys" means always the same
>>> keys ?
>>>
>>> Best Regards,
>>> Bastien
>>>
>>> --
>>>
>>> Bastien DINE
>>> Data Architect / Software Engineer / Sysadmin
>>> bastiendine.io
>>>
>>


Re: Question about key group / key state & parallelism

2018-12-12 Thread Hequn Cheng
Hi Bastien,

Each key “belongs” to exactly one parallel instance of a keyed operator,
and each parallel instance contains one or more Key Groups.
Keys will be hashed into the corresponding key group deterministically. It
is hashed by the value instead of the number of the total records.
Different keys do not affect each other even a parallel instance contains
one or more Key Groups.

Best, Hequn


On Wed, Dec 12, 2018 at 6:21 PM bastien dine  wrote:

> Hello everyone,
>
> I have a question regarding the key state & parallelism of a process
> operation
>
> Doc says : "You can think of Keyed State as Operator State that has been
> partitioned, or sharded, with exactly one state-partition per key. Each
> keyed-state is logically bound to a unique composite of
> , and since each key “belongs” to exactly
> one parallel instance of a keyed operator, we can think of this simply as
> ."
>
> If I have less parallel operator instance (say 5) than my number of
> possible key (10), it means than every instance will "manage" 2 key state ?
> (is this spread evenly ?)
> Is the logical bound fixed ? I mean, are the state always managed by the
> same instance, or does this depends on the available instance at the moment
> ?
>
> "During execution each parallel instance of a keyed operator works with
> the keys for one or more Key Groups."
> -> this is related, does "works with the keys" means always the same keys ?
>
> Best Regards,
> Bastien
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>


Re: sql program throw exception when new kafka with csv format

2018-12-11 Thread Hequn Cheng
Hi Marvin,

I had taken a look at the Flink code. It seems we can't use CSV format
for Kafka.
You can use JSON instead.
As the exception shows, Flink can't find a suitable
DeserializationSchemaFactory. Currently, only JSON and Avro support
DeserializationSchemaFactory.

Best, Hequn

On Tue, Dec 11, 2018 at 5:48 PM Marvin777 
wrote:

> Register kafka message source with csv format,  the error message is as
> follows:
>
> Exception in thread "main"
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
> a suitable table factory for
> 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> the classpath.
>
>
> Reason: No context matches.
>
>
> BTW, the flink version is 1.6.2 .
>
> Thanks Marvin.
>
> [image: image.png]
>
>


Re: How flink table api to join with mysql dimtable

2018-11-14 Thread Hequn Cheng
Hi yelun,

Currently, there are no direct ways to dynamically load data and do join in
Flink-SQL, as a workaround you can implement your logic with an udtf. In
the udtf, you can load the data into a cache and update it according to
your requirement.

Best, Hequn

On Wed, Nov 14, 2018 at 10:34 AM yelun <986463...@qq.com> wrote:

> hi,
>
> I want to use flink sql to left join static dimension table from mysql
> currently, so I converted the mysql table into data stream to join with
> datastream which has converted to flink table. While I found that the
> real-time stream data is not joined correctly with mysql data  at the
> beginning, but the latter stream can be joined correctly. So I want to ask
> that is there any good way to make real-time stream can join with mysql
> data with table api which has loaded and supporting dynamicly loading mysql
> data into memory once each hour. Thanks a lot.
>
> The following is the some example code:
>
> public static JDBCInputFormatBuilder inputBuilder =
> JDBCInputFormat.buildJDBCInputFormat()
> .setDrivername(DRIVER_CLASS)
> .setDBUrl(DB_URL)
> .setUsername(USER_NAME)
> .setPassword(USER_PASS)
> .setQuery(SELECT_ALL_PERSONS)
> .setRowTypeInfo(ROW_TYPE_INFO);
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
> StreamTableEnvironment  tEnv = TableEnvironment.getTableEnvironment(env);
>
> DataStream orderA = env.addSource(new OrderFunction());
> tEnv.registerDataStream("tableA", orderA, "name, product, amount");
>
> DataStream mysql_table = env.createInput(inputBuilder.finish());
> String[] dim_table_fileds = {"id","name","age","address"};
>
> tEnv.registerDataStream("tableB",mysql_table);
> Table result = tEnv.sqlQuery("SELECT
> tableA.name,tableA.amount,tableB.age,tableB.address FROM tableB  join
> tableA on tableA.name = tableB.name" );
> tEnv.toRetractStream(result, ROW_TYPE_INFO_OUT).print();
> env.execute();
>
> Thanks a lot.
>


Re: Field could not be resolved by the field mapping when using kafka connector

2018-11-13 Thread Hequn Cheng
Hi jeff,

We need a different field name for the rowtime indicator, something looks
like:

>   new Schema()
> .field("status", Types.STRING)
> .field("direction", Types.STRING)
> .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
> new
> Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())


Furthermore, we should define another sink schema which contains no rowtime
definitions, since currently time attributes and custom field mappings are
not supported yet for sink.

> val sinkSchema =
>   new Schema()
> .field("status", Types.STRING)
> .field("direction", Types.STRING)
> .field("rowtime", Types.SQL_TIMESTAMP)


Btw, a unified api for source and sink is under discussion now. More
details here[1]

Best, Hequn

[1]
https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?ts=5bb62df4#heading=h.41fd6rs7b3cf


On Wed, Nov 14, 2018 at 9:18 AM Jeff Zhang  wrote:

>
> Hi,
>
> I hit the following error when I try to use kafka connector in flink table
> api. There's very little document about how to use kafka connector in flink
> table api, could anyone help me on that ? Thanks
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field 'event_ts' could not be resolved by the field mapping.
> at
> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
> at
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
> at
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
> at
> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
> at
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
> at
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
> at
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
> at
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
> at
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
> at
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)
>
> And here's the source code:
>
>
>
>  case class Record(status: String, direction: String, var event_ts: Timestamp)
>
>
>   def main(args: Array[String]): Unit = {
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setParallelism(1)
> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> val data: DataStream[Record] = ...
> val tEnv = TableEnvironment.getTableEnvironment(senv)
> tEnv
>   // declare the external system to connect to
>   .connect(
>   new Kafka()
> .version("0.11")
> .topic("processed5.events")
> .startFromEarliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092"))
>   .withFormat(new Json()
> .failOnMissingField(false)
> .deriveSchema()
>   )
>   .withSchema(
> new Schema()
>   .field("status", Types.STRING)
>   .field("direction", Types.STRING)
>   .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
>   new 
> Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
>   )
>
>   // specify the update-mode for streaming tables
>   .inAppendMode()
>
>   // register as source, sink, or both and under a name
>   .registerTableSourceAndSink("MyUserTable");
>
> tEnv.fromDataStream(data).insertInto("MyUserTable")
>
> 0封新邮件
> 回复
>
>


Re: Confused window operation

2018-11-13 Thread Hequn Cheng
Hi Jeff,

The window is not a global window. It is related to a specified key. You
would have 6 windows after flatMap() and keyBy().
key: hello with 3 windows
key: world with 1 window
key: flink with 1 window
key: hadoop with 1 window

Best, Hequn


On Wed, Nov 14, 2018 at 10:31 AM Jeff Zhang  wrote:

> Hi all,
>
> I am a little confused with the following windows operation. Here's the
> code,
>
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setParallelism(1)
> val data = senv.fromElements("hello world", "hello flink", "hello hadoop")
>
> data.flatMap(line => line.split("\\s"))
>   .map(w => (w, 1))
>   .keyBy(0)
>   .countWindow(2, 1)
>   .sum(1)
>   .print("**")
>
> senv.execute()
>
>
> And this is the output:
>
> **> (hello,1)
> **> (world,1)
> **> (hello,2)
> **> (flink,1)
> **> (hello,2)
> **> (hadoop,1)
>
>
> As my understanding, here we have 3 windows.
>
> window 1
>
> (hello, world)
>
> window 2
>
> (hello, world)
>
> (hello, flink)
>
> window 3
>
> (hello flink)
>
> (hello hadoop)
>
> So for the first window, we have output (hello, 1) (world, 1)
>
> for the second window we should output (hello, 2), (world,1 ), (flink, 1)
>
> for the third window we should have output (hello, 2), (flink, 1), (hadoop, 1)
>
>
> But as you can see, in the above I get different result, do I misunderstand 
> the window ? Could anyone help me to understand that ? Thanks
>
>


Re: DataStream with one DataSource and two different Sinks with diff. schema.

2018-11-09 Thread Hequn Cheng
Hi Marke,

You can use split() and select() as is shown here[1].

Best, Hequn
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations


On Sat, Nov 10, 2018 at 12:23 AM Marke Builder 
wrote:

> Hi,
>
> what is the recommended way to implement the following use-case for
> DataStream:
> One data sink, same map() functions for parsing and normalization and
> different map() function for format and two different sinks for the output?
>
> The (same)data must be stored in both sinks.
> And I prefere one job (related to the same source and map functions)
>
> How I can/should use the split() function for this use-case?
>
> Thanks!
>


Re: ProcessFunction's Event Timer not firing

2018-11-08 Thread Hequn Cheng
Hi Fritz,

Watermarks are merged on stream shuffles. If one of the input's watermark
not progressing, they will not advance the event time at the operators. I
think you should decrease the parallelism of source and make sure there are
data in each of your source partition.
Note that the Kafka source supports per-partition watermarking, which you
can read more about here[1].

Best, Hequn
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition


On Fri, Nov 9, 2018 at 1:56 AM Fritz Budiyanto  wrote:

> Hi All,
>
> I noticed if one of the slot's watermark not progressing, its impacting
> all slots processFunction timer and no timer are not firing.
>
> In my example, I have Source parallelism set to 8 and Kafka partition is
> 4. The next operator is processFunction with parallelism of 8 +  event
> timer. I can see from the debug log that one of the slot's watermark is not
> progressing. As a result, all slot's timer in the process function are not
> firing. Is this expected behavior or issue? How do I prevent this condition?
>
> Thanks,
> Fritz


Re: Kinesis Connector

2018-11-02 Thread Hequn Cheng
Hi Steve,

I think we can check the following things step by step:
1. Confirm if the data source has data.
2. Take a look at the log of Taskmanager or Jobmanager and check if there
are exceptions.
3. Take a thread dump to see what was doing in the TaskManager.

Best, Hequn


On Fri, Nov 2, 2018 at 10:28 PM Steve Bistline 
wrote:

> I have tried just about everything to get a simple Flink application to
> consume from Kinesis. The application appears to connect ( I think ), no
> complaints... never receives any data. Even a very simple JAVA app see
> attached.
>
> Any help would be very much appreciated.
>
> Thanks
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink SQL questions

2018-11-01 Thread Hequn Cheng
Hi Michael,

There are some test cases in Flink git, such as[1] which I think may help
you.

Best, Hequn
[1]
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java


On Fri, Nov 2, 2018 at 7:46 AM TechnoMage  wrote:

> I am trying to get a minimal Flink SQL program going to test it out.  I
> have a StreamExecutionEnvironment and from that created a
> StreamTableEnvironment.  The docs indicate there should be a fromDataStream
> method to create a Table, but none appears to exist according to Eclipse.
> The method registerDataStream is also missing, just
> registerDataSteramInternal.  The Internal suggests private API to me, so I
> am asking if the docs are out of date, or I am missing some library or
> such.  I am using Java 8.0 not Scala.
>
> Michael Latta
>
>


Re: Parallelize an incoming stream into 5 streams with the same data

2018-11-01 Thread Hequn Cheng
Hi Vijay,

> I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow
operation on the KeyedStream and then perform group operation on the
resultant set to get total count etc.

>From your description, I think you can perform a TumblingEventTimeWindow
first, something looks like:

> // tumbling processing-time windows
> input
> .keyBy()
> .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
> .();

then, you can perform a windowAll after the TumblingEventTimeWindow to get
the final total count.

Best,
Hequn



On Fri, Nov 2, 2018 at 6:20 AM Vijay Balakrishnan 
wrote:

> Thanks,Hequn.
> If I have to do a TumblingWindow operation like:
>
> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, 
> TimeUnit.SECONDS))
>
> I am not able to do that on the output of keyBy(..) which is a KeyedStream.
>
> I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow 
> operation on the KeyedStream
>
> and then perform group operation on the resultant set to get total count etc.
>
> I am only able to do only 1 of keyBy or timeWindowAll as follows:
>
>
> .keyBy(*d._1,d._2*)
> .process(new KeyProcessing(FIVE_SECONDS, "componentOperation"))
>
> OR
>
> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, 
> TimeUnit.SECONDS))
> .process(new WindowProcessing(FIVE_SECONDS))
>
>
> Doing this doesn't seem to be too helpful as the keyBy KeyedStream is lost in 
> the next step:
>
> .keyBy(*d._1,d._2*)
> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, 
> TimeUnit.SECONDS))
> .process(new WindowProcessing(FIVE_SECONDS))
>
>
> TIA,
>
> Vijay
>
>
>
> On Thu, Oct 25, 2018 at 6:31 PM Hequn Cheng  wrote:
>
>> Hi Vijay,
>>
>> Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in
>> `inputStream`.
>> While option 2 replicate all data to each task and option 3 split data
>> into smaller groups without duplication.
>>
>> Best, Hequn
>>
>> On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan 
>> wrote:
>>
>>> Hi,
>>> I need to broadcast/parallelize an incoming stream(inputStream) into 5
>>> streams with the same data. Each stream is keyed by different keys to do
>>> various grouping operations on the set.
>>>
>>> Do I just use inputStream.keyBy(5 diff keys) and then just use the
>>> DataStream to perform windowing/grouping operations ?
>>>
>>> *DataStream inputStream= ...*
>>> *DataStream  keyBy1 = inputStream.keyBy((d) -> d._1);*
>>> *DataStream  keyBy2 = inputStream.keyBy((d) -> d._2);*
>>>
>>> *DataStream out1Stream = keyBy1.flatMap(new Key1Function());// do
>>> windowing/grouping operations in this function*
>>> *DataStream out2Stream = keyBy2.flatMap(new Key2Function());// do
>>> windowing/grouping operations in this function*
>>>
>>> out1Stream.print();
>>> out2Stream.addSink(new Out2Sink());
>>>
>>> Will this work ?
>>>
>>> Or do I use the keyBy Stream with a broadcast function like this:
>>>
>>> *BroadcastStream broadCastStream = inputStream.broadcast(..);*
>>> *DataSTream out1Stream = keyBy1.connect(broadCastStream)*
>>> * .process(new KeyedBroadcastProcessFunction...)*
>>>
>>> *DataSTream out2Stream = keyBy2.connect(broadCastStream)*
>>> * .process(new KeyedBroadcastProcessFunction...)*
>>>
>>> Or do I need to use split:
>>>
>>> *SplitStream source = inputStream.split(new MyOutputSelector());*
>>> *source.select("").flatMap(new Key1Function()).addSink(out1Sink);*
>>> source.select("").flatMap(new Key2Function()).addSink(out2Sink);
>>>
>>>
>>> static final class MyOutputSelector implements OutputSelector {
>>> List outputs = new ArrayList();
>>> public Iterable select(Long value) {
>>> outputs.add("");
>>> return outputs;
>>> }
>>> }
>>> TIA,
>>>
>>


Re: Ask about counting elements per window

2018-10-31 Thread Hequn Cheng
Hi Rad,

You can take a look at the group window[1] of SQL. I think it may help you.

Best, Hequn
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#aggregations

On Thu, Nov 1, 2018 at 12:53 AM Rad Rad  wrote:

> Hi All,
>
> I have a GPS stream consumed by FlinkKafkaConsumer which contains a set of
> GPSs of different users. I need to count a number of users per a specific
> window of this stream.
>
> Could anyone help me, a part of my code is below
>
>
> // read data from Kafka
> DataStream stream = env.addSource(
> new
> FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
> , new
> JSONDeserializationSchema(), parameterTool.getProperties()));
>
> DataStream Float,
> Timestamp>>  gpsStream  = stream.flatMap(gpsFlatMapFunc).
>
>
> Thanks in advanced.
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Table API and AVG on dates

2018-10-30 Thread Hequn Cheng
Hi Flavio,

You are right. Avg on dates is not supported. It requires numeric types.
As a workaround, you can transform the datetime into a numeric type using
an udf.

Best, Hequn

On Wed, Oct 31, 2018 at 1:51 AM Flavio Pompermaier 
wrote:

> Hi to all,
> I'm using Flink 1.6.1 and it seems that average on dates is not
> supported..am I right?
> Is there any effort in implementing it?
>
> Best,
> Flavio
>


Re: How to tune Hadoop version in flink shaded jar to Hadoop version actually used?

2018-10-29 Thread Hequn Cheng
Hi Henry,

You can specify a specific Hadoop version to build against:

> mvn clean install -DskipTests -Dhadoop.version=2.6.1

 More details here[1].

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#hadoop-versions

On Tue, Oct 30, 2018 at 10:02 AM vino yang  wrote:

> Hi Henry,
>
> You just need to change the node of "hadoop.version" in the parent pom
> file.
>
> Thanks, vino.
>
> 徐涛  于2018年10月29日周一 下午11:23写道:
>
>> Hi Vino,
>> Because I build the project with Maven, maybe I can not use the jars
>> directly download from the web.
>> If built with Maven, how can I adjust the Hadoop version with the Hadoop
>> version really used?
>> Thanks a lot!!
>>
>> Best
>> Henry
>>
>> 在 2018年10月26日,上午10:02,vino yang  写道:
>>
>> Hi Henry,
>>
>> When running flink on YARN, from ClusterEntrypoint the system environment
>> info is print out.
>> One of the info is "Hadoop version: 2.4.1”, I think it is from the
>> flink-shaded-hadoop2 jar. But actually the system Hadoop version is 2.7.2.
>>
>> I want to know is it OK if the version is different?
>>
>> *> I don't think it is OK, because you will use a lower version of the
>> client to access the higher version of the server.*
>>
>> Is it a best practice to adjust flink Hadoop version to the Hadoop
>> version actually used?
>>
>> *> I personally recommend that you keep the two versions consistent to
>> eliminate the possibility of causing various potential problems. *
>> *In fact, Flink provides a bundle of Hadoop 2.7.x bundles for you to
>> download.[1]*
>>
>> [1]:
>> https://www.apache.org/dyn/closer.lua/flink/flink-1.6.1/flink-1.6.1-bin-hadoop27-scala_2.11.tgz
>>
>> Thanks, vino.
>>
>> 徐涛  于2018年10月26日周五 上午9:13写道:
>>
>>> Hi Experts
>>> When running flink on YARN, from ClusterEntrypoint the system
>>> environment info is print out.
>>> One of the info is "Hadoop version: 2.4.1”, I think it is from
>>> the flink-shaded-hadoop2 jar. But actually the system Hadoop version is
>>> 2.7.2.
>>> I want to know is it OK if the version is different? Is it a
>>> best practice to adjust flink Hadoop version to the Hadoop version actually
>>> used?
>>>
>>> Thanks a lot.
>>>
>>> Best
>>> Henry
>>
>>
>>


Re: Parallelize an incoming stream into 5 streams with the same data

2018-10-25 Thread Hequn Cheng
Hi Vijay,

Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in
`inputStream`.
While option 2 replicate all data to each task and option 3 split data into
smaller groups without duplication.

Best, Hequn

On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan 
wrote:

> Hi,
> I need to broadcast/parallelize an incoming stream(inputStream) into 5
> streams with the same data. Each stream is keyed by different keys to do
> various grouping operations on the set.
>
> Do I just use inputStream.keyBy(5 diff keys) and then just use the
> DataStream to perform windowing/grouping operations ?
>
> *DataStream inputStream= ...*
> *DataStream  keyBy1 = inputStream.keyBy((d) -> d._1);*
> *DataStream  keyBy2 = inputStream.keyBy((d) -> d._2);*
>
> *DataStream out1Stream = keyBy1.flatMap(new Key1Function());// do
> windowing/grouping operations in this function*
> *DataStream out2Stream = keyBy2.flatMap(new Key2Function());// do
> windowing/grouping operations in this function*
>
> out1Stream.print();
> out2Stream.addSink(new Out2Sink());
>
> Will this work ?
>
> Or do I use the keyBy Stream with a broadcast function like this:
>
> *BroadcastStream broadCastStream = inputStream.broadcast(..);*
> *DataSTream out1Stream = keyBy1.connect(broadCastStream)*
> * .process(new KeyedBroadcastProcessFunction...)*
>
> *DataSTream out2Stream = keyBy2.connect(broadCastStream)*
> * .process(new KeyedBroadcastProcessFunction...)*
>
> Or do I need to use split:
>
> *SplitStream source = inputStream.split(new MyOutputSelector());*
> *source.select("").flatMap(new Key1Function()).addSink(out1Sink);*
> source.select("").flatMap(new Key2Function()).addSink(out2Sink);
>
>
> static final class MyOutputSelector implements OutputSelector {
> List outputs = new ArrayList();
> public Iterable select(Long value) {
> outputs.add("");
> return outputs;
> }
> }
> TIA,
>


Re: Accumulating a batch

2018-10-25 Thread Hequn Cheng
Hi Austin,

You can use GroupBy Window[1], such as TUMBLE Window. The size of the
window either as time or row-count interval. You can also define your
own User-Defined Aggregate Functions[2] to be used in window.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#tumble-tumbling-windows
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#aggregation-functions

On Fri, Oct 26, 2018 at 5:08 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi there,
>
> Is it possible to use an AggregationFunction to accumulate n values in a
> buffer until a threshold is met, then stream down all records in the batch?
>
> Thank you!
> Austin Cawley-Edwards
>


Re: Checkpoint acknowledge takes too long

2018-10-25 Thread Hequn Cheng
Hi Henry,

Thanks for letting us know.

On Thu, Oct 25, 2018 at 7:34 PM 徐涛  wrote:

> Hi Hequn & Kien,
> Finally the problem is solved.
> It is due to slow sink write. Because the job only have 2 tasks, I check
> the backpressure, found that the source has high backpressure, so I tried
> to improve the sink write. After that the end to end duration is below 1s
> and the checkpoint timeout is fixed.
>
> Best
> Henry
>
>
> 在 2018年10月24日,下午10:43,徐涛  写道:
>
> Hequn & Kien,
> Thanks a lot for your help, I will try it later.
>
> Best
> Henry
>
>
> 在 2018年10月24日,下午8:18,Hequn Cheng  写道:
>
> Hi Henry,
>
> @Kien is right. Take a thread dump to see what was doing in the
> TaskManager. Also check whether gc happens frequently.
>
> Best, Hequn
>
>
> On Wed, Oct 24, 2018 at 5:03 PM 徐涛  wrote:
>
>> Hi
>> I am running a flink application with parallelism 64, I left the
>> checkpoint timeout default value, which is 10minutes, the state size is
>> less than 1MB, I am using the FsStateBackend.
>> The application triggers some checkpoints but all of them fails
>> due to "Checkpoint expired before completing”, I check the checkpoint
>> history, found that there are 63 subtask acknowledge, but one left n/a, and
>> also the alignment duration is quite long, about 5m27s.
>> I want to know why there is one subtask does not acknowledge? And
>> because the alignment duration is long, what will influent the alignment
>> duration?
>> Thank a lot.
>>
>> Best
>> Henry
>
>
>
>


Re: Checkpoint acknowledge takes too long

2018-10-24 Thread Hequn Cheng
Hi Henry,

@Kien is right. Take a thread dump to see what was doing in the
TaskManager. Also check whether gc happens frequently.

Best, Hequn


On Wed, Oct 24, 2018 at 5:03 PM 徐涛  wrote:

> Hi
> I am running a flink application with parallelism 64, I left the
> checkpoint timeout default value, which is 10minutes, the state size is
> less than 1MB, I am using the FsStateBackend.
> The application triggers some checkpoints but all of them fails
> due to "Checkpoint expired before completing”, I check the checkpoint
> history, found that there are 63 subtask acknowledge, but one left n/a, and
> also the alignment duration is quite long, about 5m27s.
> I want to know why there is one subtask does not acknowledge? And
> because the alignment duration is long, what will influent the alignment
> duration?
> Thank a lot.
>
> Best
> Henry


Re: Dynamically Generated Classes - Cannot load user class

2018-10-22 Thread Hequn Cheng
Hi shkob

> i tried to use getRuntimeContext().getUserCodeClassLoader() as the loader
to use for Byte Buddy - but doesnt seem to be enough.
>From the log, it seems that the user class can not be found in the
classloader.

> Cannot load user class: commodel.MyGeneratedClass

Have you ever tried Thread.currentThread().getContextClassLoader(),
which should have the user-code ClassLoader.

Best, Hequn

On Tue, Oct 23, 2018 at 5:47 AM shkob1  wrote:

> Hey,
>
> I'm trying to run a job which uses a dynamically generated class (through
> Byte Buddy).
> think of me having a complex schema as yaml text and generating a class
> from
> it. Throughout the job i am using an artificial super class (MySuperClass)
> of the generated class (as for example i need to specify the generic class
> to extend RichMapFunction).
>
>
>
> MyRichMapFunction extends RichMapFunction
> is
> introducing the dynamic class. It will take the yaml in the CTOR and:
> 1. open - takes the schema and converts it into a Pojo class which extends
> MySuperClass
> 2. getProducedType - does the same thing in order to correctly send the
> Pojo
> with all the right fields
>
> So basically my job is something like
>
> env.addSource([stream of pojos])
> .filter(...)
> ... (register table, running a query which generates Rows)
> .map(myRichMapFunction)
> .returns(myRichMapFunction.getProducedType)
> .addSink(...)
>
> My trouble now is that, when running on a cluster the classloader fails to
> load my generated class.
> i tried to use getRuntimeContext().getUserCodeClassLoader() as the loader
> to
> use for Byte Buddy - but doesnt seem to be enough.
>
> Was reading about it here:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
> Is there a hook maybe to get called when a job is loaded so i can load the
> class?
>
>
> Stacktrace:
>
> org.apache.flink.client.program.ProgramInvocationException:
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
> user class: commodel.MyGeneratedClass
> ClassLoader info: URL ClassLoader:
> file:
>
> '/var/folders/f7/c4pvjrf902b6c73_tbzkxnjwgn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0'
> (valid JAR)
> Class not resolvable through given classloader.
> at
>
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:264)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
> at
>
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> at com.MainClass.main(MainClass.java:46)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785)
> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
> at
>
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot load user class: com.model.DynamicSchema
> ClassLoader info: URL ClassLoader:
> file:
>
> '/var/folders/f7/c4pvjrf902b6c73_tbzkxnjwgn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0'
> (valid JAR)
> Class not resolvable through given classloader.
> at
>
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:99)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:273)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-

Re: Trigger Firing for Late Window Elements

2018-10-19 Thread Hequn Cheng
Hi Scott,

Yes, the window trigger firing for every single late element.

If you only want the window to be triggered once, you can:
- Remove the allowedLateness()
- Use BoundedOutOfOrdernessTimestampExtractor to emit Watermarks that
lag behind the element.

The code(scala) looks like:

> class TimestampExtractor[T1, T2]
>   extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](
> Time.hours(3))  {
>   override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
> element._3.getTime
>   }
> }


Pay attention to that this will increase the latency since only trigger
firing for the last element.

Best, Hequn

On Sat, Oct 20, 2018 at 1:15 AM Scott Kidder  wrote:

> I'm using event-time windows of 1 hour that have an allowed lateness of
> several hours. This supports the processing of access logs that can be
> delayed by several hours. The windows aggregate data over the 1 hour period
> and write to a database sink. Pretty straightforward.
>
> Will the event-time trigger lead to the window trigger firing for every
> single late element? Suppose thousands of late elements arrive
> simultaneously; I'd like to avoid having that lead to thousands of database
> updates in a short period of time. Ideally, I could batch up the late
> window changes and have it trigger when the window is finally closed or
> some processing-time duration passes (e.g. once per minute).
>
> For reference, here's what the aggregate window definition looks like with
> Flink 1.5.3:
>
> chunkSource.keyBy(record -> record.getRecord().getEnvironmentId())
> .timeWindow(Time.hours(1))
> .allowedLateness(Time.hours(3))
> .aggregate(new EnvironmentAggregateWatchTimeFunction())
> .uid("env-watchtime-stats")
> .name("Env Watch-Time Stats")
> .addSink(new EnvironmentWatchTimeDBSink());
>
>
> Thank you,
>
> --
> Scott Kidder
>


  1   2   3   >