Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-21 Thread Bowen Li
Congrats, Jingsong!

On Fri, Feb 21, 2020 at 7:28 AM Till Rohrmann  wrote:

> Congratulations Jingsong!
>
> Cheers,
> Till
>
> On Fri, Feb 21, 2020 at 4:03 PM Yun Gao  wrote:
>
>>   Congratulations Jingsong!
>>
>>Best,
>>Yun
>>
>> --
>> From:Jingsong Li 
>> Send Time:2020 Feb. 21 (Fri.) 21:42
>> To:Hequn Cheng 
>> Cc:Yang Wang ; Zhijiang <
>> wangzhijiang...@aliyun.com>; Zhenghua Gao ; godfrey he
>> ; dev ; user <
>> user@flink.apache.org>
>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
>>
>> Thanks everyone~
>>
>> It's my pleasure to be part of the community. I hope I can make a better
>> contribution in future.
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng  wrote:
>> Congratulations Jingsong! Well deserved.
>>
>> Best,
>> Hequn
>>
>> On Fri, Feb 21, 2020 at 2:42 PM Yang Wang  wrote:
>> Congratulations!Jingsong. Well deserved.
>>
>>
>> Best,
>> Yang
>>
>> Zhijiang  于2020年2月21日周五 下午1:18写道:
>> Congrats Jingsong! Welcome on board!
>>
>> Best,
>> Zhijiang
>>
>> --
>> From:Zhenghua Gao 
>> Send Time:2020 Feb. 21 (Fri.) 12:49
>> To:godfrey he 
>> Cc:dev ; user 
>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
>>
>> Congrats Jingsong!
>>
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Fri, Feb 21, 2020 at 11:59 AM godfrey he  wrote:
>> Congrats Jingsong! Well deserved.
>>
>> Best,
>> godfrey
>>
>> Jeff Zhang  于2020年2月21日周五 上午11:49写道:
>> Congratulations!Jingsong. You deserve it
>>
>> wenlong.lwl  于2020年2月21日周五 上午11:43写道:
>> Congrats Jingsong!
>>
>> On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:
>>
>> > Congrats Jingsong!
>> >
>> > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
>> > >
>> > > Congratulations Jingsong! Well deserved.
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
>> > >
>> > >> Congratulations! Jingsong
>> > >>
>> > >>
>> > >> Best,
>> > >> Dan Zou
>> > >>
>> >
>> >
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>>
>>


Re: Flink connect hive with hadoop HA

2020-02-10 Thread Bowen Li
Hi sunfulin,

Sounds like you didn't config the hadoop HA correctly on the client side
according to [1]. Let us know if it helps resolve the issue.

[1]
https://stackoverflow.com/questions/25062788/namenode-ha-unknownhostexception-nameservice1




On Mon, Feb 10, 2020 at 7:11 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi,
>
> Could you please provide a full stacktrace?
>
> Regards,
> Roman
>
>
> On Mon, Feb 10, 2020 at 2:12 PM sunfulin  wrote:
>
>> Hi, guys
>> I am using Flink 1.10 and test functional cases with hive intergration.
>> Hive with 1.1.0-cdh5.3.0 and with hadoop HA enabled.Running flink job I can
>> see successful connection with hive metastore, but cannot read table data
>> with exception:
>>
>> java.lang.IllegalArgumentException: java.net.UnknownHostException:
>> nameservice1
>> at
>> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:374)
>> at
>> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
>> at
>> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:668)
>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:604)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2598)
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>>
>> I am running a standalone application. Looks like I am missing my hadoop
>> conf file in my flink job application classpath. Where should I config ?
>>
>>
>>
>>
>


Re: Read data from Oracle using Flink SQL API

2020-02-05 Thread Bowen Li
Hi Flavio,

+1 for adding Oracle (potentially more dbms like SqlServer, etc) to
flink-jdbc. Would you mind open a parent ticket and some subtasks, each one
for one to-be-added dbms you've thought of?


On Sun, Feb 2, 2020 at 10:11 PM Jingsong Li  wrote:

> Yes, And I think we should add OracleDialect,SqlServerDialect,DB2Dialect
> support too.
>
> Best,
> Jingsong Lee
>
> On Sun, Feb 2, 2020 at 5:53 PM Flavio Pompermaier 
> wrote:
>
>> Ok thanks for this info! Maybe this could be added to the
>> documentation..what do you think?
>>
>> Il Dom 2 Feb 2020, 08:37 Jingsong Li  ha scritto:
>>
>>> Hi Flavio,
>>>
>>> You can use `JDBCTableSource`, and register it from
>>>  TableEnvionment.registerTableSource, you need provide
>>>  a OracleDialect, maybe just implement `canHandle` and
>>>  `defaultDriverName` is OK.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Sun, Feb 2, 2020 at 2:42 PM Jark Wu  wrote:
>>>
 Hi Flavio,

 If you want to adjust the writing statement for Oracle, you can
 implement the JDBCDialect for Oracle, and pass to the JDBCUpsertTableSink
 when constructing via `JDBCOptions.Builder#setDialect`. In this way, you
 don't need to recompile the source code of flink-jdbc.

 Best,
 Jark

 On Fri, 31 Jan 2020 at 19:28, Flavio Pompermaier 
 wrote:

> Hi to all,
> I was looking at the Flink SQL API's and I discovered that only a few
> drivers are supported [1], i.e. Mysql, Postgres and Derby. You could have
> problems only on the writing side of the connector (TableSink) because you
> need to adjust the override statement, but for the read part you shouldn't
> have problems with dialects...am I wrong?
> And what am I supposed to do right now if I want to connect to Oracle
> using the Table API? Do I have to use the low level JDBCInputFormat? Is
> there an easy way to connect to Oracle using the Table API without the 
> need
> to modify and recompile the source code of Flink (just adding some
> interface implementation in the application JAR)?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#jdbc-connector
>
> Best,
> Flavio
>

>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: batch job OOM

2020-01-24 Thread Bowen Li
Hi Fanbin,

You can install your own Flink build in AWS EMR, and it frees you from
Emr’s release cycles

On Thu, Jan 23, 2020 at 03:36 Jingsong Li  wrote:

> Fanbin,
>
> I have no idea now, can you created a JIRA to track it? You can describe
> complete SQL and some data informations.
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu  wrote:
>
>> Jingsong,
>>
>> Do you have any suggestions to debug the above mentioned
>> IndexOutOfBoundsException error?
>> Thanks,
>>
>> Fanbin
>>
>> On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu 
>> wrote:
>>
>>> I got the following error when running another job. any suggestions?
>>>
>>> Caused by: java.lang.IndexOutOfBoundsException
>>> at
>>> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
>>> at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
>>> at HashWinAggWithKeys$538.endInput(Unknown Source)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu 
>>> wrote:
>>>
 Jingsong,

 I set the config value to be too large. After I changed it to a smaller
 number it works now!
 thanks you for the help. really appreciate it!

 Fanbin

 On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
 wrote:

> Fanbin,
>
> Looks like your config is wrong, can you show your config code?
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
> wrote:
>
>> Jingsong,
>>
>> Great, now i got a different error:
>>
>> java.lang.NullPointerException: Initial Segment may not be null
>>  at 
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>>  at 
>> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>>  at 
>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>>  at LocalHashWinAggWithKeys$292.open(Unknown Source)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>>
>> is there any other config i should add?
>>
>> thanks,
>>
>> Fanbin
>>
>>
>> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu 
>> wrote:
>>
>>> you beat me to it.
>>> let's me try that.
>>>
>>> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
>>> wrote:
>>>
 Fanbin,

 Document is here:
 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
 NOTE: you need configure this into TableConfig.

 Best,
 Jingsong Lee

 On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
 wrote:

> Jingsong,
>
> Thank you for the response.
> Since I'm using flink on EMR and the latest version is 1.9 now.
> the second option is ruled out. but will keep that in mind for future
> upgrade.
>
> I'm going to try the first option. It's probably a good idea to
> add that in the doc for example:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>
> Thanks,
> Fanbin
>
> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <
> jingsongl...@gmail.com> wrote:
>
>> Hi Fanbin,
>>
>> Thanks for using blink batch mode.
>>
>> The OOM is caused by the manage memory not enough in Hash
>> aggregation.
>>
>> There are three options you can choose from:
>>
>> 1.Is your version Flink 1.9? 1.9 still 

Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 Thread Bowen Li
Congrats!

On Thu, Jan 16, 2020 at 13:45 Peter Huang 
wrote:

> Congratulations, Dian!
>
>
> Best Regards
> Peter Huang
>
> On Thu, Jan 16, 2020 at 11:04 AM Yun Tang  wrote:
>
>> Congratulations, Dian!
>>
>> Best
>> Yun Tang
>> --
>> *From:* Benchao Li 
>> *Sent:* Thursday, January 16, 2020 22:27
>> *To:* Congxian Qiu 
>> *Cc:* d...@flink.apache.org ; Jingsong Li <
>> jingsongl...@gmail.com>; jincheng sun ; Shuo
>> Cheng ; Xingbo Huang ; Wei Zhong
>> ; Hequn Cheng ; Leonard Xu
>> ; Jeff Zhang ; user <
>> user@flink.apache.org>; user-zh 
>> *Subject:* Re: [ANNOUNCE] Dian Fu becomes a Flink committer
>>
>> Congratulations Dian.
>>
>> Congxian Qiu  于2020年1月16日周四 下午10:15写道:
>>
>> > Congratulations Dian Fu
>> >
>> > Best,
>> > Congxian
>> >
>> >
>> > Jark Wu  于2020年1月16日周四 下午7:44写道:
>> >
>> >> Congratulations Dian and welcome on board!
>> >>
>> >> Best,
>> >> Jark
>> >>
>> >> On Thu, 16 Jan 2020 at 19:32, Jingsong Li 
>> wrote:
>> >>
>> >> > Congratulations Dian Fu. Well deserved!
>> >> >
>> >> > Best,
>> >> > Jingsong Lee
>> >> >
>> >> > On Thu, Jan 16, 2020 at 6:26 PM jincheng sun <
>> sunjincheng...@gmail.com>
>> >> > wrote:
>> >> >
>> >> >> Congrats Dian Fu and welcome on board!
>> >> >>
>> >> >> Best,
>> >> >> Jincheng
>> >> >>
>> >> >> Shuo Cheng  于2020年1月16日周四 下午6:22写道:
>> >> >>
>> >> >>> Congratulations!  Dian Fu
>> >> >>>
>> >> >>> > Xingbo Wei Zhong  于2020年1月16日周四 下午6:13写道: >> jincheng sun
>> >> >>> 于2020年1月16日周四 下午5:58写道:
>> >> >>>
>> >> >>
>> >> >
>> >> > --
>> >> > Best, Jingsong Lee
>> >> >
>> >>
>> >
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>


Re: [DISCUSS] What parts of the Python API should we focus on next ?

2019-12-19 Thread Bowen Li
- integrate PyFlink with Jupyter notebook
   - Description: users should be able to run PyFlink seamlessly in Jupyter
   - Benefits: Jupyter is the industrial standard notebook for data
scientists. I’ve talked to a few companies in North America, they think
Jupyter is the #1 way to empower internal DS with Flink


On Wed, Dec 18, 2019 at 19:05 jincheng sun  wrote:

> Also CC user-zh.
>
> Best,
> Jincheng
>
>
> jincheng sun  于2019年12月19日周四 上午10:20写道:
>
>> Hi folks,
>>
>> As release-1.10 is under feature-freeze(The stateless Python UDF is
>> already supported), it is time for us to plan the features of PyFlink for
>> the next release.
>>
>> To make sure the features supported in PyFlink are the mostly demanded
>> for the community, we'd like to get more people involved, i.e., it would be
>> better if all of the devs and users join in the discussion of which kind of
>> features are more important and urgent.
>>
>> We have already listed some features from different aspects which you can
>> find below, however it is not the ultimate plan. We appreciate any
>> suggestions from the community, either on the functionalities or
>> performance improvements, etc. Would be great to have the following
>> information if you want to suggest to add some features:
>>
>> -
>> - Feature description: 
>> - Benefits of the feature: 
>> - Use cases (optional): 
>> --
>>
>> Features in my mind
>>
>> 1. Integration with most popular Python libraries
>> - fromPandas/toPandas API
>>Description:
>>   Support to convert between Table and pandas.DataFrame.
>>Benefits:
>>   Users could switch between Flink and Pandas API, for example,
>> do some analysis using Flink and then perform analysis using the Pandas API
>> if the result data is small and could fit into the memory, and vice versa.
>>
>> - Support Scalar Pandas UDF
>>Description:
>>   Support scalar Pandas UDF in Python Table API & SQL. Both the
>> input and output of the UDF is pandas.Series.
>>Benefits:
>>   1) Scalar Pandas UDF performs better than row-at-a-time UDF,
>> ranging from 3x to over 100x (from pyspark)
>>   2) Users could use Pandas/Numpy API in the Python UDF
>> implementation if the input/output data type is pandas.Series
>>
>> - Support Pandas UDAF in batch GroupBy aggregation
>>Description:
>>Support Pandas UDAF in batch GroupBy aggregation of Python
>> Table API & SQL. Both the input and output of the UDF is pandas.DataFrame.
>>Benefits:
>>   1) Pandas UDAF performs better than row-at-a-time UDAF more
>> than 10x in certain scenarios
>>   2) Users could use Pandas/Numpy API in the Python UDAF
>> implementation if the input/output data type is pandas.DataFrame
>>
>> 2. Fully support  all kinds of Python UDF
>> - Support Python UDAF(stateful) in GroupBy aggregation (NOTE: Please
>> give us some use case if you want this feature to be contained in the next
>> release)
>>   Description:
>> Support UDAF in GroupBy aggregation.
>>   Benefits:
>> Users could define and use Python UDAF and use it in GroupBy
>> aggregation. Without it, users have to use Java/Scala UDAF.
>>
>> - Support Python UDTF
>>   Description:
>>Support  Python UDTF in Python Table API & SQL
>>   Benefits:
>> Users could define and use Python UDTF in Python Table API & SQL.
>> Without it, users have to use Java/Scala UDTF.
>>
>> 3. Debugging and Monitoring of Python UDF
>>- Support User-Defined Metrics
>>  Description:
>>Allow users to define user-defined metrics and global job
>> parameters with Python UDFs.
>>  Benefits:
>>UDF needs metrics to monitor some business or technical
>> indicators, which is also a requirement for UDFs.
>>
>>- Make the log level configurable
>>  Description:
>>Allow users to config the log level of Python UDF.
>>  Benefits:
>>Users could configure different log levels when debugging and
>> deploying.
>>
>> 4. Enrich the Python execution environment
>>- Docker Mode Support
>>  Description:
>>  Support running python UDF in docker workers.
>>  Benefits:
>>  Support various of deployments to meet more users' requirements.
>>
>> 5. Expand the usage scope of Python UDF
>>- Support to use Python UDF via SQL client
>>  Description:
>>  Support to register and use Python UDF via SQL client
>>  Benefits:
>>  SQL client is a very important interface for SQL users. This
>> feature allows SQL users to use Python UDFs via SQL client.
>>
>>- Integrate Python UDF with Notebooks
>>  Description:
>>  Such as Zeppelin, etc (Especially Python dependencies)
>>
>>- Support to register Python UDF into catalog
>>   Description:
>>   Support to register Python UDF into catalog
>>   Benefits:
>>   1)Catalog is the centralized pl

Re: [DISCUSS] have separate Flink distributions with built-in Hive dependencies

2019-12-13 Thread Bowen Li
cc user ML in case anyone want to chime in

On Fri, Dec 13, 2019 at 00:44 Bowen Li  wrote:

> Hi all,
>
> I want to propose to have a couple separate Flink distributions with Hive
> dependencies on specific Hive versions (2.3.4 and 1.2.1). The distributions
> will be provided to users on Flink download page [1].
>
> A few reasons to do this:
>
> 1) Flink-Hive integration is important to many many Flink and Hive users
> in two dimensions:
>  a) for Flink metadata: HiveCatalog is the only persistent catalog to
> manage Flink tables. With Flink 1.10 supporting more DDL, the persistent
> catalog would be playing even more critical role in users' workflow
>  b) for Flink data: Hive data connector (source/sink) helps both Flink
> and Hive users to unlock new use cases in streaming, near-realtime/realtime
> data warehouse, backfill, etc.
>
> 2) currently users have to go thru a *really* tedious process to get
> started, because it requires lots of extra jars (see [2]) that are absent
> in Flink's lean distribution. We've had so many users from public mailing
> list, private email, DingTalk groups who got frustrated on spending lots of
> time figuring out the jars themselves. They would rather have a more "right
> out of box" quickstart experience, and play with the catalog and
> source/sink without hassle.
>
> 3) it's easier for users to replace those Hive dependencies for their own
> Hive versions - just replace those jars with the right versions and no need
> to find the doc.
>
> * Hive 2.3.4 and 1.2.1 are two versions that represent lots of user base
> out there, and that's why we are using them as examples for dependencies in
> [1] even though we've supported almost all Hive versions [3] now.
>
> I want to hear what the community think about this, and how to achieve it
> if we believe that's the way to go.
>
> Cheers,
> Bowen
>
> [1] https://flink.apache.org/downloads.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/#dependencies
> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/#supported-hive-versions
>


Re: [ANNOUNCE] Launch of flink-packages.org: A website to foster the Flink Ecosystem

2019-11-19 Thread Bowen Li
Great work, glad to see this finally happening!

On Tue, Nov 19, 2019 at 6:26 AM Robert Metzger  wrote:

> Thanks.
>
> I added a ticket for this nice idea:
> https://github.com/ververica/flink-ecosystem/issues/84
>
> On Tue, Nov 19, 2019 at 11:29 AM orips  wrote:
>
>> This is great.
>>
>> Can we have RSS feed for this?
>>
>> Thanks
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: [ANNOUNCE] Weekly Community Update 2019/45

2019-11-10 Thread Bowen Li
FYI,

I'll be at the ScaleByTheBay conference in Oakland on 11/14-15 to give a
Flink talk (https://sched.co/RoSY). Shoot me an email if anyone is also
there and would like to meet in person and talk about Flink, data, AI/ML,
etc.



On Sun, Nov 10, 2019 at 8:17 AM Konstantin Knauf 
wrote:

> Dear community,
>
> happy to share this week's community digest with updates on stateful
> functions & Flink 1.8.3, a discussion on a connector for
> Hortonworks/Cloudera's schema registry, a couple of meetups and a bit more.
>
> Flink Development
> ==
>
> * [stateful functions] After the successful vote to accept* Stateful
> Function*s into Flink Igal has started a thread to discuss a few details
> of the contribution like repository name, mailing lists, component name,
> etc. [1]
>
> * [releases] Jingcheng has started a conversation about the release of *Flink
> 1.8.3*. Probably still waiting for a few fixes to come in, but looks like
> there could be a first RC soon. [2]
>
> * [connectors] Őrhidi Mátyás and Gyula propose to contribute a connector
> for *Hortonworks/Cloudera Schema Registry*, which can be used during
> de-/serialization in Flink's Kafka Connector. [3]
>
> * [python] Apache Flink will start Python processes to execute *Python
> UDFs* in the Table API, planned for 1.10. Dian Fu has published a
> proposal how to integrate the resource requirements of these Python
> processes into the unified memory configuration framework, which is
> currently introduced in FLIP-49. [4]
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Stateful-Functions-Contribution-Details-tp34737.html
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-8-3-tp34811.html
> [3]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Avro-Cloudera-Registry-FLINK-14577-tp34647.html
> [4]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-PyFlink-User-Defined-Function-Resource-Management-tp34631.html
>
> Notable Bugs
> ==
>
> * [FLINK-14382] [1.9.1] [1.8.2] [yarn] In Flink 1.9+ filesystem
> dependencies can be loaded via a plugin mechanism, each with its own
> classloader. This does currently not work on YARN, where the plugin
> directory is directly added to the classpath instead. [5]
>
> [5] https://issues.apache.org/jira/browse/FLINK-14382
>
> Events, Blog Posts, Misc
> ===
>
> * *Jark Wu *is now a member of the Apache Flink PMC. Congratulations! [6]
> * This blog post by *Sreekanth Krishnavajjala & Vinod Kataria (AWS)*
> includes a hands-on introduction to Apache Flink on AWS EMR. [7]
> * Upcoming Meetups
> * At the next Athens Big Data Group on the 14th of November *Chaoran
> Yu *of Lightbend will talk about Flink and Spark on Kubernetes. [8]
> * *Bowen Li* will speak about "The Rise of Apache and Stream
> Processing" at the next Big Data Bellevue in Seattle on the 20th of
> November. [9]
> * The next edition of the Bay Area Apache Flink meetup will happen on
> the 20th of November with talks by *Gyula Fora (Cloudera)* and *Lakshmi
> Rao (Lyft)*.[10]
> * We will have our next Apache Flink Meetup in Munich on November 27th
> with talks by *Heiko Udluft & Giuseppe Sirigu*, Airbus, and *Konstantin
> Knauf* (on Stateful Functions). [11]
>
> [6]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Jark-Wu-is-now-part-of-the-Flink-PMC-tp34768.html
> [7]
> https://idk.dev/extract-oracle-oltp-data-in-real-time-with-goldengate-and-query-from-amazon-athena/
> [8] https://www.meetup.com/Athens-Big-Data/events/265957761/
> [9] https://www.meetup.com/Big-Data-Bellevue-BDB/events/fxbnllyzpbbc/
> [10] https://www.meetup.com/Bay-Area-Apache-Flink-Meetup/events/266226960/
> [11] https://www.meetup.com/Apache-Flink-Meetup-Munich/events/266072196/
>
> Cheers,
>
> Konstantin (@snntrable)
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
>
> Follow us @VervericaData Ververica <https://www.ververica.com/>
>
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>


Re: Streaming write to Hive

2019-09-05 Thread Bowen Li
Hi,

I'm not sure if there's one yet. Feel free to create one if not.

On Wed, Sep 4, 2019 at 11:28 PM Qi Luo  wrote:

> Hi Bowen,
>
> Thank you for the information! Streaming write to Hive is a very common
> use case for our users. Is there any open issue for this to which we can
> try contributing?
>
> +Yufei and Chang who are also interested in this.
>
> Thanks,
> Qi
>
> On Thu, Sep 5, 2019 at 12:16 PM Bowen Li  wrote:
>
>> Hi Qi,
>>
>> With 1.9 out of shelf, I'm afraid not. You can make HiveTableSink
>> implements AppendStreamTableSink (an empty interface for now) so it can be
>> picked up in streaming job. Also, streaming requires checkpointing, and
>> Hive sink doesn't do that yet. There might be other tweaks you need to make.
>>
>> It's on our list for 1.10, not high priority though.
>>
>> Bowen
>>
>> On Wed, Sep 4, 2019 at 2:23 AM Qi Luo  wrote:
>>
>>> Hi guys,
>>>
>>> In Flink 1.9 HiveTableSink is added to support writing to Hive, but it
>>> only supports batch mode. StreamingFileSink can write to HDFS in streaming
>>> mode, but it has no Hive related functionality (e.g. adding Hive partition).
>>>
>>> Is there any easy way we can streaming write to Hive (with exactly-once
>>> guarantee)?
>>>
>>> Thanks,
>>> Qi
>>>
>>


Re: Streaming write to Hive

2019-09-04 Thread Bowen Li
Hi Qi,

With 1.9 out of shelf, I'm afraid not. You can make HiveTableSink
implements AppendStreamTableSink (an empty interface for now) so it can be
picked up in streaming job. Also, streaming requires checkpointing, and
Hive sink doesn't do that yet. There might be other tweaks you need to make.

It's on our list for 1.10, not high priority though.

Bowen

On Wed, Sep 4, 2019 at 2:23 AM Qi Luo  wrote:

> Hi guys,
>
> In Flink 1.9 HiveTableSink is added to support writing to Hive, but it
> only supports batch mode. StreamingFileSink can write to HDFS in streaming
> mode, but it has no Hive related functionality (e.g. adding Hive partition).
>
> Is there any easy way we can streaming write to Hive (with exactly-once
> guarantee)?
>
> Thanks,
> Qi
>


Re: kinesis table connector support

2019-09-02 Thread Bowen Li
@Fanbin, I don't think there's one yet. Feel free to create a ticket and
submit a PR for it

On Mon, Sep 2, 2019 at 8:13 AM Biao Liu  wrote:

> Hi Fanbin,
>
> I'm not familiar with table module. Maybe someone else could help.
>
> @jincheng sun 
> Do you know there is any plan for kinesis table connector?
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Sat, 24 Aug 2019 at 02:26, Fanbin Bu  wrote:
>
>> Hi,
>>
>> Looks like Flink table connectors do not include `kinesis`. (only
>> FileSystem, Kafka, ES) see
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#table-connectors
>> .
>> I also found some examples for Kafka:
>> https://eventador.io/blog/flink_table_and_sql_api_with_apache_flink_16/.
>> I'm wondering is there such a thing for kinesis also.
>>
>> Is there any plan to support this in the future? Otherwise, what needs to
>> be done if we want to implement it on my own.
>>
>> Basically, I have a kinesis stream that emits json string data and I
>> would like to use Flink Table/SQL api to to the streaming/batch processing.
>> Currently, I'm using DataStream API which is not as flexible.
>>
>> Any help would be appreciated.
>>
>> Thanks,
>> Fanbin
>>
>


[ANNOUNCE] Kinesis connector becomes part of Flink releases

2019-08-30 Thread Bowen Li
Hi all,

I'm glad to announce that, as #9494
was merged today,
flink-connector-kinesis is officially of Apache 2.0 license now in master
branch and its artifact will be deployed to Maven central as part of Flink
releases starting from Flink 1.10.0. Users can use the artifact out of
shelf then and no longer have to build and maintain it on their own.

It brings a much better user experience to our large AWS customer base by
making their work simpler, smoother, and more productive!

Thanks everyone who participated in coding and review to drive this
initiative forward.

Cheers,
Bowen


Re: Hive version in Flink

2019-08-26 Thread Bowen Li
Hi,

You can read Hive related documentation [1] first. Should be ok to just
specify the Hive version as 1.2.1 for your Cloudera Hive 1.1.0 deployment.

Hive 1.1 will be officially supported in Flink 1.10.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/hive/index.html#supported-hive-versions

On Fri, Aug 23, 2019 at 10:50 AM Yebgenya Lazarkhosrouabadi <
lazarkhosrouab...@integration-factory.de> wrote:

> Hello,
>
>
>
> I’m using Flink on Cloudera-quickstart-vm-5.13 and need to access the
> Hive-Tables. The version of hive on Cloudera is 1.1.0 , but in order to
> access the data of the Hive-Tables, a higher version of hive is needed.
> Unfortunately it is not possible to easily change the version of Hive on
> Cloudera. I have installed Hive 2.1.0 on Cloudera , and want to specify it
> on Flink 10, so that Flink uses the new version of Hive and not the 1.1.0
> one. How can I do that?
>
>
>
> Regards
>
> Yebgenya Lazar
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
>


Re: Error while using catalog in .yaml file

2019-08-26 Thread Bowen Li
Put flink-connector-hive jar in classpath



On Sun, Aug 25, 2019 at 9:14 AM Yebgenya Lazarkhosrouabadi <
lazarkhosrouab...@integration-factory.de> wrote:

> Hello,
>
>
>
> I’m trying to use hivecatalog in flink1.9. I modified the yaml file like
> this:
>
>
>
>
>
> catalogs:
>
>   - name: mynewhive
>
> type: hive
>
> hive-conf-dir: /home/user/Downloads/apache-hive-1.2.2-bin/conf
>
> default-database: myhive
>
>
>
>
>
> But when I try to run *./sql-client.sh embedded * I get this error:
>
>
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: The configured
> environment is invalid. Please check your environment files again.
>
>at
> org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
>
>at
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
>
>at
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
>
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
>
>at
> org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:553)
>
>at
> org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:373)
>
>at
> org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
>
>... 2 more
>
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.CatalogFactory' in
>
> the classpath.
>
>
>
> Reason: No context matches.
>
>
>
> The following properties are requested:
>
> default-database=myhive
>
> hive-conf-dir=/home/bernadette/Downloads/apache-hive-1.2.2-bin/conf
>
> type=hive
>
>
>
> The following factories have been considered:
>
> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>
> org.apache.flink.table.planner.StreamPlannerFactory
>
> org.apache.flink.table.executor.StreamExecutorFactory
>
> org.apache.flink.table.planner.delegation.BlinkPlannerFactory
>
> org.apache.flink.table.planner.delegation.BlinkExecutorFactory
>
>at
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
>
>at
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
>
>at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
>
>at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
>
>at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:258)
>
>at
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:136)
>
>at java.util.HashMap.forEach(HashMap.java:1289)
>
>at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:135)
>
>at
> org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:549)
>
>... 4 more
>
>
>
>
>
>
>
> How can I get rid of this error?
>
>
>
> Best regards
>
> Yebgenya Lazar
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
>


Re: [ANNOUNCE] Andrey Zagrebin becomes a Flink committer

2019-08-14 Thread Bowen Li
Congratulations Andrey!

On Wed, Aug 14, 2019 at 10:18 PM Rong Rong  wrote:

> Congratulations Andrey!
>
> On Wed, Aug 14, 2019 at 10:14 PM chaojianok  wrote:
>
> > Congratulations Andrey!
> > At 2019-08-14 21:26:37, "Till Rohrmann"  wrote:
> > >Hi everyone,
> > >
> > >I'm very happy to announce that Andrey Zagrebin accepted the offer of
> the
> > >Flink PMC to become a committer of the Flink project.
> > >
> > >Andrey has been an active community member for more than 15 months. He
> has
> > >helped shaping numerous features such as State TTL, FRocksDB release,
> > >Shuffle service abstraction, FLIP-1, result partition management and
> > >various fixes/improvements. He's also frequently helping out on the
> > >user@f.a.o mailing lists.
> > >
> > >Congratulations Andrey!
> > >
> > >Best, Till
> > >(on behalf of the Flink PMC)
> >
>


[ANNOUNCE] Seattle Flink Meetup at Uber on 8/22

2019-08-12 Thread Bowen Li
Hi All !

Join our next Seattle Flink Meetup at Uber Seattle, featuring talks of
[Flink + Kappa+ @ Uber] and [Flink + Pulsar for streaming-first, unified
data processing].

- TALK #1: Moving from Lambda and Kappa Architectures to Kappa+ with Flink
at Uber
- TALK #2: When Apache Pulsar meets Apache Flink

Checkout event details and RSVP at
https://www.meetup.com/seattle-flink/events/263782233/ . See you soon!

Bowen


Re: Status of the Integration of Flink with Hive

2019-08-12 Thread Bowen Li
Hi David,

Check out Hive related documentations:

-
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/catalog.html
-
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive_integration.html
-
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive_integration_example.html

Note
- I just merged a PR restructuring Hive related docs today, changes should
reflect on the website in a day or so
- I didn't find release-1.9-snapshot's doc, so just reference
release-1.10-snapshot's doc for now. 1.9 rc2 has been released, official
1.9 should be out soon
- Hive features are in beta in 1.9

Feel free to open tickets if you have feature requests.


On Fri, Aug 9, 2019 at 8:00 AM David Morin 
wrote:

> Hi,
>
> I want to connect my Flink streaming job to Hive.
> At the moment, what is the best way to connect to Hive.
> Some features seems to be in development.
> Some really cool features have been described here:
> https://fr.slideshare.net/BowenLi9/integrating-flink-with-hive-xuefu-zhang-and-bowen-li-seattle-flink-meetup-feb-2019
> My first need is to read and update Hive metadata.
> Concerning the Hive data I can store them directly in HDFS (as Orc format)
> in a first step.
> thx.
>
> David
>
>


Re: Cannot access the data from Hive-Tables in Blink

2019-07-17 Thread Bowen Li
Hi Yebgenya,

This is caused by Hive version mismatch, you are either not using the right
Hive version (double check your Hive version is supported by Blink), or not
specifying the right version in yaml config (e.g. you use 2.3.4 but specify
it as 1.2.1).

Bowen

On Tue, Jul 16, 2019 at 11:22 AM Yebgenya Lazarkhosrouabadi <
lazarkhosrouab...@integration-factory.de> wrote:

> Hello,
>
>
>
> I’m trying to use BigBench queries on Blink in Cloudera. I have defined a
> catalog in YAML-file and can see my Hive-tables in SQL-client.
>
> But I can’t see the data of the tables, or run any other SQL-Query. I get
> this error:
>
>
>
> [ERROR] Could not execute SQL statement. Reason:
>
> org.apache.flink.hive.shaded.org.apache.thrift.TApplicationException:
> Invalid method name: 'get_table_req'
>
>
>
>
>
> How can I get rid of this error?
>
>
>
> Regards
>
> Yebgenya
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
>


Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread Bowen Li
Congrats, Rong!


On Thu, Jul 11, 2019 at 10:48 AM Oytun Tez  wrote:

> Congratulations Rong!
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Thu, Jul 11, 2019 at 1:44 PM Peter Huang 
> wrote:
>
>> Congrats Rong!
>>
>> On Thu, Jul 11, 2019 at 10:40 AM Becket Qin  wrote:
>>
>>> Congrats, Rong!
>>>
>>> On Fri, Jul 12, 2019 at 1:13 AM Xingcan Cui  wrote:
>>>
 Congrats Rong!

 Best,
 Xingcan

 On Jul 11, 2019, at 1:08 PM, Shuyi Chen  wrote:

 Congratulations, Rong!

 On Thu, Jul 11, 2019 at 8:26 AM Yu Li  wrote:

> Congratulations Rong!
>
> Best Regards,
> Yu
>
>
> On Thu, 11 Jul 2019 at 22:54, zhijiang 
> wrote:
>
>> Congratulations Rong!
>>
>> Best,
>> Zhijiang
>>
>> --
>> From:Kurt Young 
>> Send Time:2019年7月11日(星期四) 22:54
>> To:Kostas Kloudas 
>> Cc:Jark Wu ; Fabian Hueske ;
>> dev ; user 
>> Subject:Re: [ANNOUNCE] Rong Rong becomes a Flink committer
>>
>> Congratulations Rong!
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Jul 11, 2019 at 10:53 PM Kostas Kloudas 
>> wrote:
>> Congratulations Rong!
>>
>> On Thu, Jul 11, 2019 at 4:40 PM Jark Wu  wrote:
>> Congratulations Rong Rong!
>> Welcome on board!
>>
>> On Thu, 11 Jul 2019 at 22:25, Fabian Hueske 
>> wrote:
>> Hi everyone,
>>
>> I'm very happy to announce that Rong Rong accepted the offer of the
>> Flink PMC to become a committer of the Flink project.
>>
>> Rong has been contributing to Flink for many years, mainly working on
>> SQL and Yarn security features. He's also frequently helping out on the
>> user@f.a.o mailing lists.
>>
>> Congratulations Rong!
>>
>> Best, Fabian
>> (on behalf of the Flink PMC)
>>
>>
>>



Re: Hive in sql-client

2019-07-08 Thread Bowen Li
Hi Yebgenya,

To use Blink's integration with Hive in SQL CLI, you can reference Blink's
documentation at [1], [2], and [3]

Note that Hive integration is actually available in **Flink master branch**
now and will be released soon as part of Flink 1.9.0. The end-to-end
integration should be feature complete by this week or so. To use Flink's
HiveCatalog and run SQL queries on Hive data, please read and follow the
documentations at [4] and [5]. Early feedbacks are more than welcome!

[1] https://github.com/apache/flink/blob/blink/docs/dev/table/catalog.md
[2] https://github.com/apache/flink/blob/blink/docs/dev/table/sqlClient.md
[3]
https://github.com/apache/flink/blob/blink/docs/dev/table/hive_compatibility.md

[4]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#catalogs
[5] https://github.com/apache/flink/pull/8976/files  I expect this PR to be
merged very soon



On Mon, Jul 8, 2019 at 7:43 AM Yebgenya Lazarkhosrouabadi <
lazarkhosrouab...@integration-factory.de> wrote:

> Hello,
>
>
>
> I’m trying to use Hive tables in sql-client. How can I do this?
>
> I have downloaded  Blink from Github to be able to use catalogs in the
> YAML file, but I can’t run its sql-client using *./sql-client.sh embedded*
> .
>
>
>
> Can you please help me?
>
>
>
> Regards
>
> Bernadette Lazar
>
>
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
>


Re:

2019-07-08 Thread Bowen Li
Hi Xuchen,

Every email in our ML asking questions **MUST** have a valid subject, to
facilitate archive search in the future and save people's time to decide
whether they can help answer your question or not by just glimpsing the
subject thru their email clients.

Though your question itself is well written, I don't think it's acceptable
to not have a well written subject. Note that this is brought up not
specific to you as a person, but specific to a common practice everyone in
the community should follow.

Bowen

On Sun, Jul 7, 2019 at 1:19 PM Konstantin Knauf 
wrote:

> Hi Wang,
>
> you guessed correctly, the events are not replayed from Kafka, but are
> part of the state of the AsyncWaitOperator and the request are resubmitted
> by the AsyncOperator in it's open() method.
>
> Cheers,
>
> Konstantin
>
>
>
> On Mon, Jul 1, 2019 at 9:39 PM wang xuchen  wrote:
>
>> Hi Flink experts,
>>
>> I am prototyping a real time system that reads from Kafka source with
>> Flink and calls out to an external system as part of the event processing.
>> One of the most important requirements are read from Kafka should NEVER
>> stall, even in face of some async external calls slowness while holding
>> certain some kafka offsets. At least once processing is good enough.
>>
>> Currently, I am using AsyncIO with a thread pool of size 20. My
>> understanding is if I use orderedwait with a large 'capacity', consumption
>> from Kafka should continue even if some external calls experience slowness
>> (holding the offsets) as long as the capacity is not exhausted.
>>
>> (From my own reading of Flink source code, the capacity of the
>> orderedwait function translate to the size of the OrderedStreamElementQueue
>> size.)
>>
>> However, I expect that while the external calls stuck, stream source
>> should keep pumping out from Kafka as long as there is still capacity, but
>> offset after the stuck record should NOT be committed back to Kafka and
>> (the checkpoint should also stall to accomodate the stalled offests?)
>>
>> My observation is, if I set the capacity large enough (max_int / 100 for
>> instance), the consumption was not stalled (which is good), but the offsets
>> were all committed back to Kafka AFTER the stalled records and all
>> checkpoint succeeded, no back pressure was incurred.
>>
>> In this case, if some machines crash, how does Flink recover the stalled
>> offsets? Which checkpoint does Flink rollback to?  I understand that
>> commiting offset back to Kafka is merely to show progress to external
>> monitoring tool, but I hope Flink does book keeping somewhere to journal
>> async call xyz is not return and should be retried during recovery.
>>
>> ==
>>
>> I`ve done a some more experiments, looks like Flink is able to recover
>> the record which I threw completeExceptionly even if I use 'unorderedwait'
>> on the async stream.
>>
>> Which leads to Fabian`s early comments, 'Flink does not rely on Kafka
>> consumer offset to recover, committing offset to Kafka is merely to show
>> progress to external monitoring tools'.
>>
>> I couldn`t pinpoint the code that Flink uses the achieve it, maybe
>> in-flight async invokations in 'unorderedstreamelementqueue' are part of
>> the checkpoint and Flink saves the actual payload for later replay?
>>
>> Can anyone cast some lights?
>>
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
>
> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010
>
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>


Re: Source Kafka and Sink Hive managed tables via Flink Job

2019-07-04 Thread Bowen Li
Thanks Youssef. The context makes more sense to me now.

Just from your description, I doubt it might be because of upsert - the
sink's throughput in step 1 is high but may stuck in step 2. AFAIK, Hive
ACID/UPSERT is not really scalable, it's ok for rare, occasional usage but
cannot scale well to massivenes.

I'd suggest you doing a few tests:
1) find out how much percentage of your data is upsert, and google how much
percentage fits a Hive ACID/upsert use case
2) try change step 2 from upsert to just append and see if the back
pressure goes away
3) make sure if it's really the sink causing the backpressure (can easily
do from Flink UI), and debug your sink's (via logging, Java remote
debugging, etc) and see where the bottleneck is

I think you can find the root cause with above steps, please report back if
the inference is valid or not so we can help more users. In case you find
that Hive ACID is not the problem, please share some high level code of
your job, so we can take another look.

Bowen


On Thu, Jul 4, 2019 at 6:50 AM Youssef Achbany 
wrote:

> Thank you Li for your answer and sorry for the dev mistake :).
>
> *To be more clear:*
>
> We write multiple events, assigned via a Flink tumbling window, to Hive in
> one JDBC INSERT statement. We wrote a Hive sink function for that, using
> only JDBC. We do not use partitions yet, but the table is clustered into
> buckets stored as ORC.
>
> We run the Flink job with parallellism 1 because Hive does not support
> multiple INSERT statements in parallel.
>
> We observe that the first instance of the tumbling window easily insert
> 10ks records in Hive, but following windows only 100s, probably because
> backpressure kicks in then.
>
> In addition, we have answered your questions in our mail in yellow.
>
> Thank you
>
> Kind regards
>
>  -Original Message-
>
> From: Bowen Li [mailto:bowenl...@gmail.com]
>
> Sent: Wednesday, July 03, 2019 9:34 PM
>
> To: dev; youssef.achb...@euranova.eu
>
> Subject: Re: Source Kafka and Sink Hive managed tables via Flink Job
>
>  Hi Youssef,
>
>  You need to provide more background context:
>
> - Which Hive sink are you using? We are working on the official Hive sink
>
> for community and will be released in 1.9. So did you develop yours in
>
> house?
>
> JDBC
>
>  - What do you mean by 1st, 2nd, 3rd window? You mean the parallel
> instances
>
> of the same operator, or do you have you have 3 windowing operations
>
> chained?
>
> No parrell instances, I was refering tumbling window
>
>  - What does your Hive table look like? E.g. is it partitioned or
>
> non-partitioned? If partitioned, how many partitions do you have? is it
>
> writing in static partition or dynamic partition mode? what format? how
>
> large?
>
>  No partitioning done because low volumes (<100K records)
>
> Format: ORC
>
> Batches of 20K records are processed in the first windows
>
>  - What does your sink do - is each parallelism writing to multiple
>
> partitions or a single partition/table? Is it only appending data or
>
> upserting?
>
>  Single partition table, in 2 steps: (1) writing to temporary table
> (append), (2) execute SQL to upsert historical table with temporary table
>
> On Wed, 3 Jul 2019 at 21:39, Bowen Li  wrote:
>
>> BTW,  I'm adding user@ mailing list since this is a user question and
>> should be asked there.
>>
>> dev@ mailing list is only for discussions of Flink development. Please
>> see https://flink.apache.org/community.html#mailing-lists
>>
>> On Wed, Jul 3, 2019 at 12:34 PM Bowen Li  wrote:
>>
>>> Hi Youssef,
>>>
>>> You need to provide more background context:
>>>
>>> - Which Hive sink are you using? We are working on the official Hive
>>> sink for community and will be released in 1.9. So did you develop yours in
>>> house?
>>> - What do you mean by 1st, 2nd, 3rd window? You mean the parallel
>>> instances of the same operator, or do you have you have 3 windowing
>>> operations chained?
>>> - What does your Hive table look like? E.g. is it partitioned or
>>> non-partitioned? If partitioned, how many partitions do you have? is it
>>> writing in static partition or dynamic partition mode? what format? how
>>> large?
>>> - What does your sink do - is each parallelism writing to multiple
>>> partitions or a single partition/table? Is it only appending data or
>>> upserting?
>>>
>>> On Wed, Jul 3, 2019 at 1:38 AM Youssef Achbany <
>>> youssef.achb...@euranova.eu> wrote:
>>>
>>>> Dear all,
>>>>

Re: Source Kafka and Sink Hive managed tables via Flink Job

2019-07-03 Thread Bowen Li
BTW,  I'm adding user@ mailing list since this is a user question and
should be asked there.

dev@ mailing list is only for discussions of Flink development. Please see
https://flink.apache.org/community.html#mailing-lists

On Wed, Jul 3, 2019 at 12:34 PM Bowen Li  wrote:

> Hi Youssef,
>
> You need to provide more background context:
>
> - Which Hive sink are you using? We are working on the official Hive sink
> for community and will be released in 1.9. So did you develop yours in
> house?
> - What do you mean by 1st, 2nd, 3rd window? You mean the parallel
> instances of the same operator, or do you have you have 3 windowing
> operations chained?
> - What does your Hive table look like? E.g. is it partitioned or
> non-partitioned? If partitioned, how many partitions do you have? is it
> writing in static partition or dynamic partition mode? what format? how
> large?
> - What does your sink do - is each parallelism writing to multiple
> partitions or a single partition/table? Is it only appending data or
> upserting?
>
> On Wed, Jul 3, 2019 at 1:38 AM Youssef Achbany <
> youssef.achb...@euranova.eu> wrote:
>
>> Dear all,
>>
>> I'm working for a big project and one of the challenge is to read Kafka
>> topics and copy them via Hive command into Hive managed tables in order to
>> enable ACID HIVE properties.
>>
>> I try it but I have a issue with back pressure:
>> - The first window read 20.000 events and wrote them in Hive tables
>> - The second, third, ... send only 100 events because the write in Hive
>> take more time than the read of a Kafka topic. But writing 100 events or
>> 50.000 events takes +/- the same time for Hive.
>>
>> Someone have already do this source and sink? Could you help on this?
>> Or have you some tips?
>> It seems that defining a size window on number of event instead time is
>> not
>> possible. Is it true?
>>
>> Thank you for your help
>>
>> Youssef
>>
>> --
>> ♻ Be green, keep it on the screen
>>
>


Re: [External] Flink 1.7.1 on EMR metrics

2019-06-01 Thread Bowen Li
To answer your question on your debugging code, your reporter has a bug:

log.info("STATSD SENDING: ", name, value);

should be ->

log.info("STATSD SENDING: {} {}", name, value);


-




On Sat, Jun 1, 2019 at 7:30 PM Padarn Wilson  wrote:

> Thanks both: Using the the inbuilt Slf4j reporter is a great idea, I will
> do this.
>
> @Peter.Groesbeck - appreciate  the config. This looks very similar to what
> I had, but if it is working for you perhaps there is something else missing
> from our EMR setup. Will go back and double check the connectivity from all
> the instances.
>
>
>
> On Thu, May 30, 2019 at 9:42 PM Peter Groesbeck 
> wrote:
>
>> Hi Padarn for what it's worth I am using DataDog metrics on EMR with
>> Flink 1.7.1 and this here my flink-conf configuration:
>>
>> - Classification: flink-conf
>>   ConfigurationProperties:
>> metrics.reporter.dghttp.class: 
>> org.apache.flink.metrics.datadog.DatadogHttpReporter
>> metrics.reporter.dghttp.apikey: 
>> metrics.reporter.dghttp.tags: 
>> 'service:myservice,env:prod,region:us-east-1'
>> metrics.scope.jm: 'jobmanager'
>> metrics.scope.jm.job: 'jobmanager'
>> metrics.scope.operator: 'taskmanager..'
>> metrics.scope.task: 'taskmanager..'
>> metrics.scope.tm: 'taskmanager'
>> metrics.scope.tm.job: 'taskmanager'
>>   Configurations: []
>>
>>
>> On Thu, May 30, 2019 at 6:46 AM Yun Tang  wrote:
>>
>>> Hi Padarn
>>>
>>> If you want to verify why no metrics sending out, how about using the
>>> built-in Slf4j reporter [1] which would record metrics in logs.
>>> If you could view the metrics after enabled slf4j-reporter, you could
>>> then compare the configurations.
>>>
>>> Best
>>> Yun Tang
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter
>>>
>>> --
>>> *From:* Padarn Wilson 
>>> *Sent:* Thursday, May 30, 2019 18:20
>>> *To:* user
>>> *Subject:* [External] Flink 1.7.1 on EMR metrics
>>>
>>> Hello all,
>>>
>>> I am trying to run Flink 1.7.1 on EMR and having some trouble with
>>> metric reporting.
>>>
>>> I was using the DataDogHttpReporter, but have also tried the
>>> StatsDReporter, but with both was seeing no metrics being collected.
>>>
>>> To debug this I implemented my own reporter (based on StatsDReporter)
>>> and logged the name of the metric being sent:
>>>
>>> private void send(final String name, final String value) {
>>>log.info("STATSD SENDING: ", name, value);
>>>try {
>>>   String formatted = String.format("%s:%s|g", name, value);
>>>   byte[] data = formatted.getBytes(StandardCharsets.UTF_8);
>>>   socket.send(new DatagramPacket(data, data.length, this.address));
>>>}
>>>catch (IOException e) {
>>>   LOG.error("unable to send packet to statsd at '{}:{}'", 
>>> address.getHostName(), address.getPort());
>>>}
>>> }
>>>
>>>
>>> This code is certainly reached, because in my log I see a lot of this:
>>>
>>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
>>> - STATSD SENDING:
>>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
>>> - STATSD SENDING:
>>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
>>> - STATSD SENDING:
>>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
>>> - STATSD SENDING:
>>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
>>> - STATSD SENDING:
>>> 2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
>>> - STATSD SENDING:
>>> 2019-05-30 10:18:40,352 INFO  com.grab.statsd.StatsDReporter
>>> - STATSD SENDING:
>>>
>>> As you can see, the name and value for the metric being reported is empty.
>>>
>>>
>>> And the logs show everything initialized fine with no error:
>>>
>>> 2019-05-30 10:18:30,342 INFO  
>>> org.apache.flink.runtime.metrics.MetricRegistryImpl   - Configuring 
>>> stsd with {port=8125, host=127.0.0.1, class=com.grab.statsd.StatsDReporter}.
>>> 2019-05-30 10:18:30,344 INFO  com.grab.statsd.StatsDReporter
>>> - Configured StatsDReporter with {host:127.0.0.1, port:8125}
>>> 2019-05-30 10:18:30,344 INFO  
>>> org.apache.flink.runtime.metrics.MetricRegistryImpl   - 
>>> Periodically reporting metrics in intervals of 10 SECONDS for reporter stsd 
>>> of type com.grab.statsd.StatsDReporter.
>>>
>>>
>>>
>>>
>>> Has anyone else  tried to work with Flink and metrics on EMR 1.7.1 (latest 
>>> version on EMR). If so, any pointers as to what could be set up incorrectly?
>>>
>>>
>>>
>>>
>>> *Grab is hiring. Learn more at https://grab.careers
>>> *
>>>
>>> By communicating with Grab Inc and/or its subsidiaries, associate
>>> companies and jointly controlled entities (“Grab Group”), you 

[ANNOUNCE] Seattle Flink Meetup at AWS on May 30

2019-05-20 Thread Bowen Li
Hi Greater Seattle folks!

We are hosting our next meetup with AWS Kinesis Analytics team on May 30
next Thursday in downtown Seattle.

We feature two talks this time:

   1. *"AWS Kinesis Analytics: running Flink serverless in multi-tenant
   environment"* by Kinesis Analytics team on:
  - How to run Apache Flink applications using Amazon Kinesis Data
  Analytics service
  - Challenges they faced building a multi-tenant serverless Flink
  service
   2. *"How to contribute to Flink: a hands-on guide to start your Apache
   open-source journey"* by me to get developers onboard given the
   considerably rising interest of contributing to Flink from local tech
   community

Please find details and RSVP at
https://www.meetup.com/seattle-flink/events/260865206/  See you next week!

Cheers,
Bowen


Re: [DISCUSS] Drop Elasticssearch 1 connector

2019-04-05 Thread Bowen Li
+1 for dropping elasticsearch 1 connector.

On Wed, Apr 3, 2019 at 5:10 AM Chesnay Schepler  wrote:

> Hello everyone,
>
> I'm proposing to remove the connector for elasticsearch 1.
>
> The connector is used significantly less than more recent versions (2&5
> are downloaded 4-5x more), and hasn't seen any development for over a
> hear, yet still incurred maintenance overhead due to licensing and testing.
>
>
>


Re: [PROGRESS UPDATE] [DISCUSS] Flink-Hive Integration and Catalogs

2019-03-20 Thread Bowen Li
Thanks, Shaoxuan! I've sent a Chinese version to user-zh at the same time
yesterday.

>From feedbacks we received so far, supporting multiple older hive versions
is definitely one of our focuses next.

*More feedbacks are welcome from our community!*


On Tue, Mar 19, 2019 at 8:44 PM Shaoxuan Wang  wrote:

> Hi Bowen,
> Thanks for driving this. I am CCing this email/survey to user-zh@
> flink.apache.org as well.
> I heard there are lots of interests on Flink-Hive from the field. One of
> the biggest requests the hive users are raised is "the support of
> out-of-date hive version". A large amount of users are still working on the
> cluster with CDH/HDP installed with old hive version, say 1.2.1/2.1.1. We
> need ensure the support of these Hive version when planning the work on
> Flink-Hive integration.
>
> *@all. "We want to get your feedbacks on Flink-Hive integration." *
>
> Regards,
> Shaoxuan
>
> On Wed, Mar 20, 2019 at 7:16 AM Bowen Li  wrote:
>
>> Hi Flink users and devs,
>>
>> We want to get your feedbacks on integrating Flink with Hive.
>>
>> Background: In Flink Forward in Beijing last December, the community
>> announced to initiate efforts on integrating Flink and Hive. On Feb 21 
>> Seattle
>> Flink Meetup <https://www.meetup.com/seattle-flink/events/258723322/>,
>> We presented Integrating Flink with Hive
>> <https://www.slideshare.net/BowenLi9/integrating-flink-with-hive-xuefu-zhang-and-bowen-li-seattle-flink-meetup-feb-2019>
>>  with
>> a live demo to local community and got great response. As of mid March now,
>> we have internally finished building Flink's brand-new catalog
>> infrastructure, metadata integration with Hive, and most common cases of
>> Flink reading/writing against Hive, and will start to submit more design
>> docs/FLIP and contribute code back to community. The reason for doing it
>> internally first and then in community is to ensure our proposed solutions
>> are fully validated and tested, gain hands-on experience and not miss
>> anything in design. You are very welcome to join this effort, from
>> design/code review, to development and testing.
>>
>> *The most important thing we believe you, our Flink users/devs, can help
>> RIGHT NOW is to share your Hive use cases and give us feedbacks for this
>> project. As we start to go deeper on specific areas of integration, you
>> feedbacks and suggestions will help us to refine our backlogs and
>> prioritize our work, and you can get the features you want sooner! *Just
>> for example, if most users is mainly only reading Hive data, then we can
>> prioritize tuning read performance over implementing write capability.
>> A quick review of what we've finished building internally and is ready to
>> contribute back to community:
>>
>>- Flink/Hive Metadata Integration
>>   - Unified, pluggable catalog infra that manages meta-objects,
>>   including catalogs, databases, tables, views, functions, partitions,
>>   table/partition stats
>>   - Three catalog impls - A in-memory catalog, HiveCatalog for
>>   embracing Hive ecosystem, GenericHiveMetastoreCatalog for persisting
>>   Flink's streaming/batch metadata in Hive metastore
>>   - Hierarchical metadata reference as
>>   .. in SQL and Table API
>>   - Unified function catalog based on new catalog infra, also
>>   support Hive simple UDF
>>- Flink/Hive Data Integration
>>   - Hive data connector that reads partitioned/non-partitioned Hive
>>   tables, and supports partition pruning, both Hive simple and complex 
>> data
>>   types, and basic write
>>- More powerful SQL Client fully integrated with the above features
>>and more Hive-compatible SQL syntax for better end-to-end SQL experience
>>
>> *Given above info, we want to learn from you on: How do you use Hive
>> currently? How can we solve your pain points? What features do you expect
>> from Flink-Hive integration? Those can be details like:*
>>
>>- *Which Hive version are you using? Do you plan to upgrade Hive?*
>>- *Are you planning to switch Hive engine? What timeline are you
>>looking at? Until what capabilities Flink has will you consider using 
>> Flink
>>with Hive?*
>>- *What's your motivation to try Flink-Hive? Maintain only one data
>>processing system across your teams for simplicity and maintainability?
>>Better performance of Flink over Hive itself?*
>>- *What are your Hive use cases? How large is your Hive data size? Do
>>you 

[PROGRESS UPDATE] [DISCUSS] Flink-Hive Integration and Catalogs

2019-03-19 Thread Bowen Li
Hi Flink users and devs,

We want to get your feedbacks on integrating Flink with Hive.

Background: In Flink Forward in Beijing last December, the community
announced to initiate efforts on integrating Flink and Hive. On Feb 21 Seattle
Flink Meetup <https://www.meetup.com/seattle-flink/events/258723322/>, We
presented Integrating Flink with Hive
<https://www.slideshare.net/BowenLi9/integrating-flink-with-hive-xuefu-zhang-and-bowen-li-seattle-flink-meetup-feb-2019>
with
a live demo to local community and got great response. As of mid March now,
we have internally finished building Flink's brand-new catalog
infrastructure, metadata integration with Hive, and most common cases of
Flink reading/writing against Hive, and will start to submit more design
docs/FLIP and contribute code back to community. The reason for doing it
internally first and then in community is to ensure our proposed solutions
are fully validated and tested, gain hands-on experience and not miss
anything in design. You are very welcome to join this effort, from
design/code review, to development and testing.

*The most important thing we believe you, our Flink users/devs, can help
RIGHT NOW is to share your Hive use cases and give us feedbacks for this
project. As we start to go deeper on specific areas of integration, you
feedbacks and suggestions will help us to refine our backlogs and
prioritize our work, and you can get the features you want sooner! *Just
for example, if most users is mainly only reading Hive data, then we can
prioritize tuning read performance over implementing write capability.
A quick review of what we've finished building internally and is ready to
contribute back to community:

   - Flink/Hive Metadata Integration
  - Unified, pluggable catalog infra that manages meta-objects,
  including catalogs, databases, tables, views, functions, partitions,
  table/partition stats
  - Three catalog impls - A in-memory catalog, HiveCatalog for
  embracing Hive ecosystem, GenericHiveMetastoreCatalog for persisting
  Flink's streaming/batch metadata in Hive metastore
  - Hierarchical metadata reference as
  .. in SQL and Table API
  - Unified function catalog based on new catalog infra, also support
  Hive simple UDF
   - Flink/Hive Data Integration
  - Hive data connector that reads partitioned/non-partitioned Hive
  tables, and supports partition pruning, both Hive simple and complex data
  types, and basic write
   - More powerful SQL Client fully integrated with the above features and
   more Hive-compatible SQL syntax for better end-to-end SQL experience

*Given above info, we want to learn from you on: How do you use Hive
currently? How can we solve your pain points? What features do you expect
from Flink-Hive integration? Those can be details like:*

   - *Which Hive version are you using? Do you plan to upgrade Hive?*
   - *Are you planning to switch Hive engine? What timeline are you looking
   at? Until what capabilities Flink has will you consider using Flink with
   Hive?*
   - *What's your motivation to try Flink-Hive? Maintain only one data
   processing system across your teams for simplicity and maintainability?
   Better performance of Flink over Hive itself?*
   - *What are your Hive use cases? How large is your Hive data size? Do
   you mainly do reading, or both reading and writing?*
   - *How many Hive user defined functions do you have? Are they mostly
   UDF, GenericUDF, or UDTF, or UDAF?*
   - any questions or suggestions you have? or as simple as how you feel
   about the project

Again, your input will be really valuable to us, and we hope, with all of
us working together, the project can benefits our end users. Please feel
free to either reply to this thread or just to me. I'm also working on
creating a questionnaire to better gather your feedbacks, watch for the
maillist in the next couple days.

Thanks,
Bowen


Re: [DISCUSS] Create a Flink ecosystem website

2019-03-08 Thread Bowen Li
Confluent hub  for Kafka is another good
example of this kind. I personally like it over the spark site. May worth
checking it out with Kafka folks

On Thu, Mar 7, 2019 at 6:06 AM Becket Qin  wrote:

> Absolutely! Thanks for the pointer. I'll submit a PR to update the
> ecosystem page and the navigation.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Mar 7, 2019 at 8:47 PM Robert Metzger  wrote:
>
> > Okay. I will reach out to spark-packages.org and see if they are willing
> > to share.
> >
> > Do you want to raise a PR to update the ecosystem page (maybe sync with
> > the "Software Projects" listed here:
> > https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink) and
> > link it in the navigation?
> >
> > Best,
> > Robert
> >
> >
> > On Thu, Mar 7, 2019 at 10:13 AM Becket Qin  wrote:
> >
> >> Hi Robert,
> >>
> >> I think it at least worths checking if spark-packages.org owners are
> >> willing to share. Thanks for volunteering to write the requirement
> >> descriptions! In any case, that will be very helpful.
> >>
> >> Since a static page has almost no cost, and we will need it to redirect
> >> to the dynamic site anyways, how about we first do that while working on
> >> the dynamic website?
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Thu, Mar 7, 2019 at 4:59 AM Ufuk Celebi  wrote:
> >>
> >>> I like Shaoxuan's idea to keep this a static site first. We could then
> >>> iterate on this and make it a dynamic thing. Of course, if we have the
> >>> resources in the community to quickly start with a dynamic site, I'm
> >>> not apposed.
> >>>
> >>> – Ufuk
> >>>
> >>> On Wed, Mar 6, 2019 at 2:31 PM Robert Metzger 
> >>> wrote:
> >>> >
> >>> > Awesome! Thanks a lot for looking into this Becket! The VMs hosted by
> >>> Infra
> >>> > look suitable.
> >>> >
> >>> > @Shaoxuan: There is actually already a static page. It used to be
> >>> linked,
> >>> > but has been removed from the navigation bar for some reason. This is
> >>> the
> >>> > page: https://flink.apache.org/ecosystem.html
> >>> > We could update the page and add it back to the navigation bar for
> the
> >>> > coming weeks. What do you think?
> >>> >
> >>> > I would actually like to push for a dynamic page right away.
> >>> >
> >>> > I know it's kind of a bold move, but how do you feel about sending
> the
> >>> > owners of spark-packages.org a short note, if they are interested in
> >>> > sharing the source? We could maintain the code together in a public
> >>> repo.
> >>> > If they are not interested in sharing, or we decide not to ask in the
> >>> first
> >>> > place, I'm happy to write down a short description of the
> requirements,
> >>> > maybe some mockups. We could then see if we find somebody here in the
> >>> > community who's willing to implement it.
> >>> > Given the number of people who are eager to contribute, I believe we
> >>> will
> >>> > be able to find somebody pretty soon.
> >>> >
> >>> >
> >>> > On Wed, Mar 6, 2019 at 3:49 AM Becket Qin 
> >>> wrote:
> >>> >
> >>> > > Forgot to provide the link...
> >>> > >
> >>> > > [1] https://www.apache.org/dev/services.html#blogs (Apache infra
> >>> services)
> >>> > > [2] https://www.apache.org/dev/freebsd-jails (FreeBSD Jail
> provided
> >>> by
> >>> > > Apache Infra)
> >>> > >
> >>> > > On Wed, Mar 6, 2019 at 10:46 AM Becket Qin 
> >>> wrote:
> >>> > >
> >>> > >> Hi Robert,
> >>> > >>
> >>> > >> Thanks for the feedback. These are good points. We should
> absolutely
> >>> > >> shoot for a dynamic website to support more interactions in the
> >>> community.
> >>> > >> There might be a few things to solve:
> >>> > >> 1. The website code itself. An open source solution would be
> great.
> >>> TBH,
> >>> > >> I do not have much experience on building a website. It'll be
> great
> >>> if
> >>> > >> someone could help comment on the solution here.
> >>> > >> 2. The hardware to host the website. Apache Infra provides a few
> >>> > >> services[1] that Apache projects can leverage. I did not see
> >>> database
> >>> > >> service, but maybe we can run a simple MySQL db in FreeBSD
> jail[2].
> >>> > >>
> >>> > >> @Bowen & vino, thanks for the positive feedback!
> >>> > >>
> >>> > >> @Shaoxuan Wang 
> >>> > >> Thanks for the suggestion. That sounds reasonable to me. We
> >>> probably need
> >>> > >> a page in the Flink official site anyways, even just provide links
> >>> it to
> >>> > >> the ecosystem website. So listing the connectors in that static
> >>> page seems
> >>> > >> something we could start with while we are working on the dynamic
> >>> pages.
> >>> > >>
> >>> > >> Thanks,
> >>> > >>
> >>> > >> Jiangjie (Becket) Qin
> >>> > >>
> >>> > >> On Wed, Mar 6, 2019 at 10:40 AM Shaoxuan Wang <
> wshaox...@gmail.com>
> >>> > >> wrote:
> >>> > >>
> >>> > >>> Hi Becket and Robert,
> >>> > >>>
> >>> > >>> I like this idea!  Let us roll this out with Flink connectors at
> >>> the
> >>> > >>> first beginning. We can start with a static page, and upg

Re: [DISCUSS] Create a Flink ecosystem website

2019-03-05 Thread Bowen Li
Thanks for bring it up, Becket. That sounds very good to me. Spark also has
such a page for ecosystem project
https://spark.apache.org/third-party-projects.html and a hosted website
https://spark-packages.org/ with metadata, categories/tags and stats
mentioned in the doc.

Bowen

On Tue, Mar 5, 2019 at 9:36 AM Robert Metzger  wrote:

> Hey Becket,
>
> This is a great idea!
> For this to be successful, we need to make sure the page is placed
> prominently so that the people submitting something will get attention for
> their contributions.
> I think a dynamic site would probably be better, if we want features such
> as up and downvoting or comments.
> I would also like this to be hosted on Apache infra, and endorsed by the
> community.
>
> Does anybody here know any existing software that we could use?
> The only think I was able to find is AUR: https://aur.archlinux.org/
> (which is a community packages site for Arch Linux. The source code of this
> portal is open source, but the layout and structure is not an ideal fit for
> our requirements)
>
> Best,
> Robert
>
>
>
> On Tue, Mar 5, 2019 at 12:03 PM Becket Qin  wrote:
>
>> Hi folks,
>>
>> I would like to start a discussion thread about creating a Flink
>> ecosystem website. The website aims to help contributors who have developed
>> projects around Flink share their work with the community.
>>
>> Please see the following doc for more details.
>>
>> https://docs.google.com/document/d/12oCItoLbKrLGuwEUFcCfigezIR2hW3925j1hh3kGp4A/edit#
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>


Re: TimeZone shift problem in Flink SQL

2019-01-24 Thread Bowen Li
Hi,

Did you consider timezone in conversion in your UDF?


On Tue, Jan 22, 2019 at 5:29 AM 徐涛  wrote:

> Hi Experts,
> I have the following two UDFs,
> unix_timestamp:   transform from string to Timestamp, with the
> arguments (value:String, format:String), return Timestamp
>from_unixtime:transform from Timestamp to String, with the
> arguments (ts:Long, format:String), return String
>
>
> select
>  number,
>  ts,
>  from_unixtime(unix_timestamp(LAST_UPDATE_TIME, 'EEE MMM dd
> HH:mm:Ss z '),'-MM-dd')  as dt
>   from
>  test;
>
>  when the LAST_UPDATE_TIME value is "Tue Jan 22 21:03:12 CST 2019”,
> the unix_timestamp return a Timestamp with value 1548162182001.
>   but when from_unixtime is invoked, the timestamp with
> value 1548190982001 is passed in, there are 8 hours shift between them.
>   May I know why there are 8 hours shift between them, and how can I
> get the timestamp that are passed out originally from the first UDF without
> changing the code?
>   Thanks very much.
>
> Best
> Henry
>


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-24 Thread Bowen Li
+1 for leaner distribution and a better 'download' webpage.

+1 for a full distribution if we can automate it besides supporting the
leaner one. If we support both, I'd image release managers should be able
to package two distributions with a single change of parameter instead of
manually package the full distribution. How to achieve that needs to be
evaluated and discussed, probably can be something like 'mvn clean install
-Dfull/-Dlean', I'm not sure yet.


On Wed, Jan 23, 2019 at 10:11 AM Thomas Weise  wrote:

> +1 for trimming the size by default and offering the fat distribution as
> alternative download
>
>
> On Wed, Jan 23, 2019 at 8:35 AM Till Rohrmann 
> wrote:
>
>> Ufuk's proposal (having a lean default release and a user convenience
>> tarball) sounds good to me. That way advanced users won't be bothered by
>> an
>> unnecessarily large release and new users can benefit from having many
>> useful extensions bundled in one tarball.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jan 23, 2019 at 3:42 PM Ufuk Celebi  wrote:
>>
>> > On Wed, Jan 23, 2019 at 11:01 AM Timo Walther 
>> wrote:
>> > > I think what is more important than a big dist bundle is a helpful
>> > > "Downloads" page where users can easily find available filesystems,
>> > > connectors, metric repoters. Not everyone checks Maven central for
>> > > available JAR files. I just saw that we added a "Optional components"
>> > > section recently [1], we just need to make it more prominent. This is
>> > > also done for the SQL connectors and formats [2].
>> >
>> > +1 I fully agree with the importance of the Downloads page. We
>> > definitely need to make any optional dependencies that users need to
>> > download easy to find.
>> >
>>
>


Re: [ANNOUNCE] Apache Flink 1.5.2 released

2018-07-31 Thread Bowen Li
Congratulations, community!

On Tue, Jul 31, 2018 at 1:44 AM Chesnay Schepler  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.5.2, which is the second bugfix release for the Apache Flink 1.5
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2018/07/31/release-1.5.2.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12343588
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Chesnay
>


Re: Is KeyedProcessFunction available in Flink 1.4?

2018-07-19 Thread Bowen Li
Hi Anna,

KeyedProcessFunction is only available starting from Flink 1.5. The doc is
here
.
It extends ProcessFunction and shares the same functionalities except
giving more access to timers' key, thus you can refer to examples of
ProcessFunction in that document.

Bowen


On Thu, Jul 19, 2018 at 3:26 PM anna stax  wrote:

> Hello all,
> I am using Flink 1.4 because thats the version provided by the latest AWS
> EMR.
> Is KeyedProcessFunction available in Flink 1.4?
>
> Also please share any links to good examples on using KeyedProcessFunction
> .
>
> Thanks
>


Re: Is Flink using even-odd versioning system

2018-07-10 Thread Bowen Li
Hi Alexander,

AFAIK, Flink releases don't do that. The community has done its best to
ensure every release is at its best state.

Thanks,
Bowen


On Tue, Jul 10, 2018 at 4:54 AM Alexander Smirnov <
alexander.smirn...@gmail.com> wrote:

> to denote development and stable releases?
>


Re: Flink and AWS S3 integration: java.lang.NullPointerException: null uri host

2018-05-30 Thread Bowen Li
Did you run Flink on AWS EMR or somewhere else? Have you read and followed
instructions on
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#amazon-web-services-aws
?



On Wed, May 30, 2018 at 7:08 AM, Fabian Wollert  wrote:

> Hi, I'm trying to set up Checkpoints for Flink Jobs with S3 as a
> filesystem backend. I configured the following:
>
> state.backend=filesystem
> state.backend.fs.checkpointdir=s3:///mybucket/
> state.checkpoints.dir=s3:///mybucket/
> state.checkpoints.num-retained=3
>
> I also copied the flink-s3-fs-hadoop-1.5.0.jar into the lib folder.
>
> I get now though the following error message:
>
> Caused by: java.lang.NullPointerException: null uri host.
> at java.util.Objects.requireNonNull(Objects.java:228)
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.
> s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:65)
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.
> s3a.S3AFileSystem.initialize(S3AFileSystem.java:165)
> at org.apache.flink.fs.s3hadoop.S3FileSystemFactory.create(
> S3FileSystemFactory.java:133)
>
> I tried to dig deeper into the source code, but struggled to find
>
>- what is meant with this URI
>- where to configure it
>
> Can anybody give some advice how to set up the S3 Backend with the new
> shaded lib jar?
>
> Thanks in advance
> --
>
>
> *Fabian WollertZalando SE*
>
> E-Mail: fabian.woll...@zalando.de
>
> Tamara-Danz-Straße 1
> 
> 10243 Berlin
> 
> Fax: +49 (0)30 2759 46 93
> E-mail: legalnot...@zalando.co.uk
> Notifications of major holdings (Sec. 33, 38, 39 WpHG):  +49 (0)30
> 2000889349
>
> Management Board:
> Robert Gentz, David Schneider, Rubin Ritter
>
> Chairman of the Supervisory Board:
> Lothar Lanz
>
> Person responsible for providing the contents of Zalando SE acc. to Art.
> 55 RStV [Interstate Broadcasting Agreement]: Rubin Ritter
> Registered at the Local Court Charlottenburg Berlin, HRB 158855 B
> VAT registration number: DE 260543043
>


Re: [ANNOUNCE] Apache Flink 1.5.0 release

2018-05-28 Thread Bowen Li
Congratulations, everyone!

On Mon, May 28, 2018 at 1:15 AM, Fabian Hueske  wrote:

> Thank you Till for serving as a release manager for Flink 1.5!
>
> 2018-05-25 19:46 GMT+02:00 Till Rohrmann :
>
> > Quick update: I had to update the date of the release blog post which
> also
> > changed the URL. It can now be found here:
> >
> > http://flink.apache.org/news/2018/05/25/release-1.5.0.html
> >
> > Cheers,
> > Till
> >
> > On Fri, May 25, 2018 at 7:03 PM, Hao Sun  wrote:
> >
> > > This is great. Thanks for the effort to get this out!
> > >
> > > On Fri, May 25, 2018 at 9:47 AM Till Rohrmann 
> > > wrote:
> > >
> > >> The Apache Flink community is very happy to announce the release of
> > >> Apache Flink 1.5.0.
> > >>
> > >> Apache Flink® is an open-source stream processing framework for
> > >> distributed, high-performing, always-available, and accurate data
> > streaming
> > >> applications.
> > >>
> > >> The release is available for download at:
> > >>
> > >> https://flink.apache.org/downloads.html
> > >>
> > >> Please check out the release blog post for an overview of the new
> > >> features and improvements and the list of contributors:
> > >>
> > >> http://flink.apache.org/news/2018/05/18/release-1.5.0.html
> > >>
> > >> The full release notes are available in Jira:
> > >>
> > >> https://issues.apache.org/jira/secure/ReleaseNote.jspa?
> > >> projectId=12315522&version=12341764
> > >>
> > >> I would like to thank all contributors for working very hard on making
> > >> this release a success!
> > >>
> > >> Best,
> > >> Till
> > >>
> > >
> >
>


Re: Clarification in TumblingProcessing TimeWindow Documentation

2018-05-27 Thread Bowen Li
Hi Dhruv,

I can see it's confusing, and it does seem the comment should be improved.
You can find concrete explanation of tumbling window and relative arguments
at
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#tumbling-windows

Feel free to open a PR with better comment.

Thanks,
Bowen



On Sun, May 27, 2018 at 12:43 PM, Dhruv Kumar  wrote:

> Hi
>
> I was looking at TumblingProcessingTimeWindows.java
> 
>  and
> was a bit confused with the documentation at the start of this class. It
> says the following:
>
> /**
> * A {@link WindowAssigner} that windows elements into windows based on
> the current
> * system time of the machine the operation is running on. Windows cannot
> overlap.
> *
> * For example, in order to window into windows of 1 minute, every 10
> seconds:
> *  {@code
> * DataStream> in = ...;
> * KeyedStream> keyed = in.keyBy(...);
> * WindowedStream, String, TimeWindows> windowed =
> * keyed.window(TumblingProcessingTimeWindows.of(Time.of(1, MINUTES),
> Time.of(10, SECONDS));
> * } 
> */
> It says one can have tumbling windows of 1 minute, every 10 seconds.
> Doesn’t this become a sliding window then? The SlidingProcessTimeWindows.
> java
> 
>  has
> the exact same documentation with just one tiny change (“Windows can
> possibly overlap”). It seems to me that in the above documentation, the
> second Time argument of 10 seconds is for providing the window offset (as
> confirmed here
> )
> and not for starting the tumbling window every 10 seconds.
>
> Thanks
>
>
> --
> *Dhruv Kumar*
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me
>
>


Re: Missing MapState when Timer fires after restored state

2018-05-14 Thread Bowen Li
Hi Juho,

You are right, there's no transactional guarantee on timers and state in
processElement(). They may end up with inconsistency if your job was
cancelled in the middle of processing an element.

To avoid the situation, the best programming practice is to always check if
the state you're trying to get is null or not.

I've also created https://issues.apache.org/jira/browse/FLINK-9362 to
document this.

Thanks
Bowen



On Mon, May 14, 2018 at 4:00 AM, Juho Autio  wrote:

> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old
> state. After restoring state from a checkpoint, it seems like a timer had
> been restored, but not the data that was expected to be in a related
> MapState if such timer has been added.
>
> The way I see this is that there's a bug, either of these:
> - The writing of timers & map states to Flink state is not synchronized
> (or maybe there are no such guarantees by design?)
> - Flink may restore a checkpoint that is actually corrupted/incomplete
>
> Our code (simplified):
>
> private MapState mapState;
>
> public void processElement(..) {
> mapState.put("lastUpdated", ctx.timestamp().toString());
> ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
> stateRetentionMillis);
> }
>
> public void onTimer(long timestamp, OnTimerContext ctx, ..) {
> long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
> if (timestamp >= lastUpdated + stateRetentionMillis) {
> mapState.clear();
> }
> }
>
> Normally this "just works". As you can see, it shouldn't be possible that
> "lastUpdated" doesn't exist in state if timer was registered and onTimer
> gets called.
>
> However, after restoring state from a checkpoint, the job kept failing
> with this error:
>
> Caused by: java.lang.NumberFormatException: null
> at java.lang.Long.parseLong(Long.java:552)
> at java.lang.Long.parseLong(Long.java:631)
> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.
> java:136)
> ..
>
> So apparently onTimer was called but lastUpdated wasn't found in the
> MapState.
>
> The background for restoring state in this case is not entirely clean.
> There was an OS level issue "Too many open files" after running a job for
> ~11 days. To fix that, we replaced the cluster with a new one and launched
> the Flink job again. State was successfully restored from the latest
> checkpoint that had been created by the "problematic execution". Now, I'm
> assuming that if the state wouldn't have been created successfully,
> restoring wouldn't succeed either – correct? This is just to rule out that
> the issue with state didn't happen because the checkpoint files were
> somehow corrupted due to the Too many open files problem.
>
> Thank you all for your continued support!
>
> P.S. I would be very much interested to hear if there's some cleaner way
> to achieve this kind of TTL for keyed state in Flink.
>


Re: Recommended books

2018-05-09 Thread Bowen Li
I'd recommend this book, *Stream Processing with Apache Flink:
Fundamentals, Implementation, and Operation of Streaming Applications.*
It's probably the most authentic book about Flink on the market. You can
buy and read the early release on OReilly,
http://shop.oreilly.com/product/0636920057321.do

On Wed, May 9, 2018 at 4:33 AM, Georgi Stoyanov  wrote:

> Hi Esa,
>
>
>
> Afaik, there’s no other resource about using Scala in Flink except the
> documentation & the blog - https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/dev/scala_api_extensions.html
> https://flink.apache.org/blog/
>
>
>
> If you want something only for scala - https://scala-lang.org/
> documentation/learn.html. First two are kind of “essential”.
>
> Also you could check Fabian Hueske’s book that will be released later
> this year - https://www.amazon.com/Stream-Processing-Apache-Flink-
> Implementation/dp/149197429X/ref=sr_1_1?ie=UTF8&qid=
> 1525865019&sr=8-1&keywords=flink (there is some free chapters in Safari
> books)
>
>
>
>
>
> Regards,
>
> Georgi
>
>
> --
> *From:* Esa Heikkinen 
> *Sent:* Wednesday, May 9, 2018 2:19:01 PM
> *To:* user@flink.apache.org
> *Subject:* Recommended books
>
>
> Hi
>
>
>
> Could you recommend some Flink books to learn Scala programming and basics
> in Flink ?
>
>
>
> Best, Esa
>
>
>


Re: 1.4.3 release/roadmap

2018-04-19 Thread Bowen Li
​to find bug fixes that are going into​ 1.4.x, say 1.4.3, you can filter
jira tickets with 'Fix Versions' as '1.4.3'

On Thu, Apr 19, 2018 at 1:36 AM, Daniel Harper 
wrote:

> Hi there,
>
> There are some bug fixes that are in the 1.4 branch that we would like to
> be made available for us to use.
>
> Is there a roadmap from the project when the next stable 1.4.x release
> will be cut? Any blockers?
>


Re: [ANNOUNCE] Apache Flink 1.4.1 released

2018-02-15 Thread Bowen Li
Congratulations everyone!

On Thu, Feb 15, 2018 at 10:04 AM, Tzu-Li (Gordon) Tai 
wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.4.1, which is the first bugfix release for the Apache Flink 1.4
> series.
>
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
>
> The release is available for download at:
>
> https://flink.apache.org/downloads.html
>
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
>
> https://flink.apache.org/news/2018/02/15/release-1.4.1.html
>
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?
> projectId=12315522&version=12342212
>
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
>
> Cheers,
>
> Gordon
>
>


[SEATTLE MEETUP] Announcing First Seattle Apache Flink Meetup

2018-01-04 Thread Bowen Li
Hi Flinkers,

After months of preparation, we are excited to announce our first Seattle
Apache Flink meetup, and invite you to join us!

*Please RSVP
at https://www.meetup.com/seattle-apache-flink/events/246458117
<https://www.meetup.com/seattle-apache-flink/events/246458117>. *Food and
drinks will be provided. Plenty of onsite parking spots.

DATE: Jan 17th, 2018, Wednesday

TALKS:

- Haitao Wang, Senior Staff Engineer at Alibaba, will give a presentation
on large-scale streaming processing with Flink and Flink SQL at Alibaba and
several internal use cases.

- Bowen Li will talk about details of future meetup planning and logistics,
also how they use Flink at OfferUp.

AGENDA

- 5:30pm - 6pm Food and networking
- 6pm - 7:30pm Presentations and Q&As

We can't wait to see you then!

Thanks,
Bowen, on behalf of the meetup organizing team


Re: Could not flush and close the file system output stream to s3a, is this fixed?

2017-12-14 Thread Bowen Li
Hi,

The problem reported in FLINK-7590 only happened one time on our end. And,
as you can see from its comments,  we suspected it's caused by AWS-SDK or
Hadoop's s3a implementation, which we have no control over.

Flink 1.4.0 has its own S3 implementations. I haven't tried it yet.


On Thu, Dec 14, 2017 at 2:05 AM, Fabian Hueske  wrote:

> Bowen Li (in CC) closed the issue but there is no fix (or at least it is
> not linked in the JIRA).
> Maybe it was resolved in another issue or can be differently resolved.
>
> @Bowen, can you comment on how to fix this problem? Will it work in Flink
> 1.4.0?
>
> Thank you,
> Fabian
>
> 2017-12-13 5:28 GMT+01:00 Hao Sun :
>
>> https://issues.apache.org/jira/browse/FLINK-7590
>>
>> I have a similar situation with Flink 1.3.2 on K8S
>>
>> =
>> 2017-12-13 00:57:12,403 INFO 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph
>> - Source: KafkaSource(maxwell.tickets) -> 
>> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> 
>> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3) 
>> (6ad009755a6009975d197e75afa05e14)
>> switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> 
>> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3).} at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:970) at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception:
>> Could not materialize checkpoint 803 for operator Source:
>> KafkaSource(maxwell.tickets) -> MaxwellFilter->Maxwell(maxwell.tickets)
>> -> FixedDelayWatermark(maxwell.tickets) -> 
>> MaxwellFPSEvent->InfluxDBData(maxwell.tickets)
>> -> Sink: influxdbSink(maxwell.tickets) (1/3). ... 6 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at 
>> java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:906) ... 5 more Suppressed:
>> java.lang.Exception: Could not properly cancel managed operator state
>> future. at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:98) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.cleanup(StreamTask.java:1023) at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by:
>> java.util.concurrent.ExecutionException: java.io.IOException: Could not
>> flush and close the file system output stream to
>> s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at 
>> java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at 
>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>> at org.apache.flink.streaming.api.operators.OperatorSnapshotRes
>> ult.cancel(OperatorSnapshotResult.java:96) ... 7 more Caused by:
>> java.io.IOException: Could not flush and close the file system output
>> stream to s3a://zendesk-euc1-fraud-prevention-production/checkpoints/d
>> 5a8b2ab61625cf0aa1e66360b7ad0af/chk-803/4f485204-3ec5-402a-a57d-fab13e068cbc
>> in order to obtain the stream state handle at org.apache.flink.runtime.st

Re: [DISCUSS] Dropping Scala 2.10

2017-09-19 Thread Bowen Li
+1 for dropping support for Scala 2.10

On Tue, Sep 19, 2017 at 3:29 AM, Sean Owen  wrote:

> For the curious, here's the overall task in Spark:
>
> https://issues.apache.org/jira/browse/SPARK-14220
>
> and  most of the code-related changes:
>
> https://github.com/apache/spark/pull/18645
>
> and where it's stuck at the moment:
>
> https://mail-archives.apache.org/mod_mbox/spark-dev/201709.mbox/%
> 3CCAMAsSdKe7Os80mX7jYaD2vNWLGWioBgCb4GG55eaN_iotFxZvw%40mail.gmail.com%3E
>
>
>
> On Tue, Sep 19, 2017 at 11:07 AM Márton Balassi 
> wrote:
>
>> Hi Aljoscha,
>>
>> I am in favor of the change. No concerns on my side, just one remark that
>> I have talked to Sean last week (ccd) and he mentioned that he has faced
>> some technical issues while driving the transition from 2.10 to 2.12 for
>> Spark. It had to do with changes in the scope of implicits. You might end
>> up hitting the same.
>>
>> Best,
>> Marton
>>
>> On Tue, Sep 19, 2017 at 11:56 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>>
>>> Talking to some people I get the impression that Scala 2.10 is quite
>>> outdated by now. I would like to drop support for Scala 2.10 and my main
>>> motivation is that this would allow us to drop our custom Flakka build of
>>> Akka that we use because newer Akka versions only support Scala 2.11/2.12
>>> and we need a backported feature.
>>>
>>> Are there any concerns about this?
>>>
>>> Best,
>>> Aljoscha
>>
>>
>>


Re: Does RocksDB need a dedicated CPU?

2017-09-05 Thread Bowen Li
Thank you, Kien!

On Tue, Sep 5, 2017 at 8:01 AM, Kien Truong  wrote:

> Hi,
>
> In my experience, RocksDB uses very little CPU, and doesn't need a
> dedicated CPU.
>
> However, it's quite disk intensive. You'd need fast, ideally dedicated
> SSDs to achieve the best performance.
>
> Regards,
>
> Kien
>
>
>
> On 9/5/2017 1:15 PM, Bowen Li wrote:
>
>> Hi guys,
>>
>> Does RocksDB need a dedicated CPU? Do we need to allocate one CPU for
>> each RocksDB while deploying Flink cluster with RocksDB state backend?
>>
>> I think there's probably no need since RocksDB is a native 'library', but
>> I want to confirm it with Flink community.
>>
>> Thanks,
>> Bowen
>>
>


Does RocksDB need a dedicated CPU?

2017-09-04 Thread Bowen Li
Hi guys,

Does RocksDB need a dedicated CPU? Do we need to allocate one CPU for each
RocksDB while deploying Flink cluster with RocksDB state backend?

I think there's probably no need since RocksDB is a native 'library', but I
want to confirm it with Flink community.

Thanks,
Bowen


Re: Even out the number of generated windows

2017-08-28 Thread Bowen Li
That's exactly what I found yesterday! Thank you Aljoscha for confirming it!

On Mon, Aug 28, 2017 at 2:57 AM, Aljoscha Krettek 
wrote:

> Hi Bowen,
>
> There is not built-in TTL but you can use a ProcessFunction to set a timer
> that clears state.
>
> ProcessFunction docs: https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/process_function.html
>
> Best,
> Aljoscha
>
> On 27. Aug 2017, at 19:19, Bowen Li  wrote:
>
> Hi Robert,
> Thank you for the suggestion, I'll try that.
>
> On a second thought, I can actually reduce the amount of generated
> output so there aren't that many records being sent to Kinesis.
>
> What I want to do is to use Flink's state to keep track of the last
> computation result of a window by each key. If the latest computation
> result is the same as the last one, my Flink job shouldn't emit a new
> record. However, that requires some expiration functionality so that the
> state won't grow indefinitely, as explained in https://issues.apache.org/
> jira/browse/FLINK-3089. Are there anyway to expire keyed state by time?
>
> Thanks,
> Bowen
>
>
>
> On Sun, Aug 27, 2017 at 5:41 AM, Robert Metzger 
> wrote:
>
>> Hi Bowen,
>>
>> I don't know what kind of relationship your company has to AWS, maybe
>> they are willing to look into the issue from their side.
>>
>> To throttle a stream, I would recommend just doing a map operation that
>> is calling  "Thread.sleep()" every n records.
>>
>> On Sat, Aug 26, 2017 at 4:11 AM, Bowen Li 
>> wrote:
>>
>>> Hi Robert,
>>> We use kinesis sink (FlinkKinesisProducer). The main pain is the Kinesis
>>> Producer Library (KPL) that FlinkKinesisProducer uses.
>>>
>>> KPL is basically a java wrapper with a c++ core. It's slow, unstable,
>>> easy to crash, memory-and-CPU-consuming (it sends traffic via HTTP), and
>>> can't handle high workload like a few million records at a short period of
>>> time. Well, in order to write to Kinesis, there's no other options except
>>> KPL (AWS Kinesis SDK is even slower), so I'm not blaming Flink chose KPL.
>>>
>>> Are there any recommended ways to "artificially throttle down the
>>> stream before the sink"? How to add the throttling into Flink's fluent
>>> API?
>>>
>>> Thanks,
>>> Bowen
>>>
>>>
>>> On Fri, Aug 25, 2017 at 2:31 PM, Robert Metzger 
>>> wrote:
>>>
>>>> Hi Bowen,
>>>>
>>>> (very nice graphics :) )
>>>>
>>>> I don't think you can do anything about the windows itself (unless you
>>>> are able to build the windows yourself using the ProcessFunction, playing
>>>> some tricks because you know your data), so I should focus on reducing the
>>>> pain in "burning down your sink".
>>>> Are there any issues with the Sink by the spikes? (What's the
>>>> downstream system?)
>>>> Does it make sense for you to artificially throttle down the stream
>>>> before the sink, so that the records per second get limited to a certain
>>>> rate. Since you are using Event time, the window results will always be
>>>> correct & consistent. From a business perspective, this will of course
>>>> introduce additional latency (= results come in later).
>>>>
>>>>
>>>> On Fri, Aug 25, 2017 at 6:23 AM, Bowen Li 
>>>> wrote:
>>>>
>>>>> Hi guys,
>>>>>
>>>>> I do have a question for how Flink generates windows.
>>>>>
>>>>> We are using a 1-day sized sliding window with 1-hour slide to count
>>>>> some features of items based on event time. We have about 20million items.
>>>>> We observed that Flink only emit results on a fixed time in an hour (e.g.
>>>>> 1am, 2am, 3am,  or 1:15am, 2:15am, 3:15am with a 15min offset). That's
>>>>> means 20million windows/records are generated at the same time every hour,
>>>>> which burns down our sink. But nothing is generated in the rest of that
>>>>> hour. The pattern is like this:
>>>>>
>>>>> # generated windows
>>>>> |
>>>>> |/\  /\
>>>>> |   /  \/  \
>>>>> |_/__\___/__\_
>>>>>  time
>>>>>
>>>>> Is there any way to even out the number of generated windows/records
>>>>> in an hour? Can we have evenly distributed generated load like this?
>>>>>
>>>>> # generated windows
>>>>> |
>>>>> |
>>>>> | 
>>>>> |___
>>>>>  time
>>>>>
>>>>> Thanks,
>>>>> Bowen
>>>>>
>>>>>
>>>>
>>>
>>
>
>


Re: Even out the number of generated windows

2017-08-27 Thread Bowen Li
Hi Robert,
Thank you for the suggestion, I'll try that.

On a second thought, I can actually reduce the amount of generated
output so there aren't that many records being sent to Kinesis.

What I want to do is to use Flink's state to keep track of the last
computation result of a window by each key. If the latest computation
result is the same as the last one, my Flink job shouldn't emit a new
record. However, that requires some expiration functionality so that the
state won't grow indefinitely, as explained in
https://issues.apache.org/jira/browse/FLINK-3089. Are there anyway to
expire keyed state by time?

Thanks,
Bowen



On Sun, Aug 27, 2017 at 5:41 AM, Robert Metzger  wrote:

> Hi Bowen,
>
> I don't know what kind of relationship your company has to AWS, maybe they
> are willing to look into the issue from their side.
>
> To throttle a stream, I would recommend just doing a map operation that is
> calling  "Thread.sleep()" every n records.
>
> On Sat, Aug 26, 2017 at 4:11 AM, Bowen Li  wrote:
>
>> Hi Robert,
>> We use kinesis sink (FlinkKinesisProducer). The main pain is the Kinesis
>> Producer Library (KPL) that FlinkKinesisProducer uses.
>>
>> KPL is basically a java wrapper with a c++ core. It's slow, unstable,
>> easy to crash, memory-and-CPU-consuming (it sends traffic via HTTP), and
>> can't handle high workload like a few million records at a short period of
>> time. Well, in order to write to Kinesis, there's no other options except
>> KPL (AWS Kinesis SDK is even slower), so I'm not blaming Flink chose KPL.
>>
>> Are there any recommended ways to "artificially throttle down the stream
>> before the sink"? How to add the throttling into Flink's fluent API?
>>
>> Thanks,
>> Bowen
>>
>>
>> On Fri, Aug 25, 2017 at 2:31 PM, Robert Metzger 
>> wrote:
>>
>>> Hi Bowen,
>>>
>>> (very nice graphics :) )
>>>
>>> I don't think you can do anything about the windows itself (unless you
>>> are able to build the windows yourself using the ProcessFunction, playing
>>> some tricks because you know your data), so I should focus on reducing the
>>> pain in "burning down your sink".
>>> Are there any issues with the Sink by the spikes? (What's the downstream
>>> system?)
>>> Does it make sense for you to artificially throttle down the stream
>>> before the sink, so that the records per second get limited to a certain
>>> rate. Since you are using Event time, the window results will always be
>>> correct & consistent. From a business perspective, this will of course
>>> introduce additional latency (= results come in later).
>>>
>>>
>>> On Fri, Aug 25, 2017 at 6:23 AM, Bowen Li 
>>> wrote:
>>>
>>>> Hi guys,
>>>>
>>>> I do have a question for how Flink generates windows.
>>>>
>>>> We are using a 1-day sized sliding window with 1-hour slide to count
>>>> some features of items based on event time. We have about 20million items.
>>>> We observed that Flink only emit results on a fixed time in an hour (e.g.
>>>> 1am, 2am, 3am,  or 1:15am, 2:15am, 3:15am with a 15min offset). That's
>>>> means 20million windows/records are generated at the same time every hour,
>>>> which burns down our sink. But nothing is generated in the rest of that
>>>> hour. The pattern is like this:
>>>>
>>>> # generated windows
>>>> |
>>>> |/\  /\
>>>> |   /  \/  \
>>>> |_/__\___/__\_
>>>>  time
>>>>
>>>> Is there any way to even out the number of generated windows/records in
>>>> an hour? Can we have evenly distributed generated load like this?
>>>>
>>>> # generated windows
>>>> |
>>>> |
>>>> | 
>>>> |___
>>>>  time
>>>>
>>>> Thanks,
>>>> Bowen
>>>>
>>>>
>>>
>>
>


Re: Even out the number of generated windows

2017-08-25 Thread Bowen Li
Hi Robert,
We use kinesis sink (FlinkKinesisProducer). The main pain is the Kinesis
Producer Library (KPL) that FlinkKinesisProducer uses.

KPL is basically a java wrapper with a c++ core. It's slow, unstable, easy
to crash, memory-and-CPU-consuming (it sends traffic via HTTP), and can't
handle high workload like a few million records at a short period of time.
Well, in order to write to Kinesis, there's no other options except KPL
(AWS Kinesis SDK is even slower), so I'm not blaming Flink chose KPL.

Are there any recommended ways to "artificially throttle down the stream
before the sink"? How to add the throttling into Flink's fluent API?

Thanks,
Bowen


On Fri, Aug 25, 2017 at 2:31 PM, Robert Metzger  wrote:

> Hi Bowen,
>
> (very nice graphics :) )
>
> I don't think you can do anything about the windows itself (unless you are
> able to build the windows yourself using the ProcessFunction, playing some
> tricks because you know your data), so I should focus on reducing the pain
> in "burning down your sink".
> Are there any issues with the Sink by the spikes? (What's the downstream
> system?)
> Does it make sense for you to artificially throttle down the stream before
> the sink, so that the records per second get limited to a certain rate.
> Since you are using Event time, the window results will always be correct &
> consistent. From a business perspective, this will of course introduce
> additional latency (= results come in later).
>
>
> On Fri, Aug 25, 2017 at 6:23 AM, Bowen Li  wrote:
>
>> Hi guys,
>>
>> I do have a question for how Flink generates windows.
>>
>> We are using a 1-day sized sliding window with 1-hour slide to count some
>> features of items based on event time. We have about 20million items. We
>> observed that Flink only emit results on a fixed time in an hour (e.g. 1am,
>> 2am, 3am,  or 1:15am, 2:15am, 3:15am with a 15min offset). That's means
>> 20million windows/records are generated at the same time every hour, which
>> burns down our sink. But nothing is generated in the rest of that hour. The
>> pattern is like this:
>>
>> # generated windows
>> |
>> |/\  /\
>> |   /  \/  \
>> |_/__\___/__\_
>>  time
>>
>> Is there any way to even out the number of generated windows/records in
>> an hour? Can we have evenly distributed generated load like this?
>>
>> # generated windows
>> |
>> |
>> | 
>> |___
>>  time
>>
>> Thanks,
>> Bowen
>>
>>
>


Re: Which window function to use to start a window at anytime

2017-08-25 Thread Bowen Li
Hi Aljoscha,
Thank you very much!

We imagined it's going to be very expensive to achieve that, and your
answer verified our understanding of how Flink works.

Regards,
Bowen



On Fri, Aug 25, 2017 at 8:18 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> I'm afraid this is not possible right now because it would require keeping
> state in the WindowAssigner (per key) about what the start timestamp for a
> specific key is.
>
> I think you could emulate that behaviour by having a stateful FlatMap that
> keeps track of all keys and their respective timestamp and assigns windows
> based on that. For this, you would emit a custom data type that has the
> original data along with the assigned window. This window would then be
> "extracted" in the WindowAssigner. The downside of this is that you will
> have a lot of state so you would need a way to clean that up. You could do
> this by using a ProcessFunction where you set a cleanup timer for the
> per-key window-start state.
>
> Best,
> Aljoscha
>
> On 16. Aug 2017, at 06:37, Bowen Li  wrote:
>
> Hi guys,
>
> We are trying use Flink to count millions of keyed items of an hour window
> hourly as `time(SlidingEventTimeWindows.of(1hour, 1hour))`. According to
> the sliding window doc
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#sliding-windows>,
> all windows are aligned with epoch and always start at integral hours like
> 1:00:00.000 - 1:59:59.999.
>
> But we actually want to start an hour window whenever an new item arrives.
> For example,
> - for item A, the first event arrives at 1:12:24.123, so the window would
> be 1:12:24.123 - 2:12:24.122, and the next window would be 2:12:24.123 -
> 3:12:24.122, and so on
> - for item B, the first event arrives at 1:10:20:321, so the window would
> be 1:10:20:321 - 2:10:20:320, and the next window would be 2:10:20:321 -
> 3:10:20:320, and so on.
>
> Do you have any insights on how to achieve it? Thanks!
>
> Bowen
>
>
>


Re: Flink doesn't free YARN slots after restarting

2017-08-25 Thread Bowen Li
Hi Till,
What I mean is: can the sliding windows for different item have different
start time?

Here's an example of what we want:
- for item A: its first event arrives at 2017/8/24-01:*12:24*, so the 1st
window should be 2017/8/24-01:*12:24* - 2017/8/25-01:*12:23*, the 2nd
window would be 2017/8/24-02:*12:24* - 2017/8/25-02:*12:23*, and so on
- for item B: its first event arrives at 2017/8/24-01:*10:20*, so the 1st
window should be 2017/8/24-01:*10:20* - 2017/8/25-01:*10:19*, the 2nd
window would be 2017/8/24-02:*10:20* - 2017/8/25-02:*10:19*, and so on.

But we observed that what Flink does is: for both A and B, their own unique
time offset within an hour (*12:24 and 10:20*) are eliminated by Flink, and
windows are unified to be like 2017/8/24-01:*00:00* - 2017/8/25-01:*00:00*,
2017/8/24-02:*00:00* - 2017/8/25-02:*00:00*, and so on.

Unifying the starting time of windows for all items brings us trouble. It
means 20million windows are triggered and fired at same time, and the
downstream Kinesis sink cannot handle the amount of output. We actually
want windows for different items to be triggered and fired at different
time within an hour, so we can even out the amount of output to downstream
Kinesis sink, as my ASCII charts demonstrated.

Does my example make sense?

Thanks,
Bowen

On Fri, Aug 25, 2017 at 12:01 AM, Till Rohrmann 
wrote:

> Hi Bowen,
>
> having a sliding window of one day with a slide of one hour basically
> means that each window is closed after 24 hours and the next closing
> happens one hour later. Only when the window is closed/triggered, you
> compute the window function which generates the window output. That's why
> you see the spikes in your load and it's basically caused by the program
> semantics.
>
> What do you mean by burning down the underlying KPL? If KPL has a max
> throughput, then the FlinkKinesisProducer should ideally respect that.
>
> nice ASCII art btw :-)
>
> Cheers,
> Till
>
> On Fri, Aug 25, 2017 at 6:20 AM, Bowen Li  wrote:
>
>> Hi Till,
>>
>> Thank you very much for looking into it! According to our investigation,
>> this is indeed a Kinesis issue. Flink (FlinkKinesisProducer) uses
>> KPL(Kinesis Producer Library), but hasn't tune it up yet. I have identified
>> a bunch of issues, opened the following Flink tickets, and are working on
>> them.
>>
>>
>>- [FLINK-7367][kinesis connector] Parameterize more configs for
>>FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections,
>>RequestTimeout, etc)
>>- [FLINK-7366][kinesis connector] Upgrade kinesis producer library in
>>flink-connector-kinesis
>>- [FLINK-7508] switch FlinkKinesisProducer to use KPL's ThreadingMode
>>to ThreadedPool mode rather than Per_Request mode
>>
>>
>> I do have a question for Flink performance. We are using a 1-day
>> sized sliding window with 1-hour slide to count some features of items
>> based on event time. We have about 20million items. We observed that Flink
>> only emit results on a fixed time in an hour (e.g. 1am, 2am, 3am,  or
>> 1:15am, 2:15am, 3:15am with a 15min offset). That's means 20million
>> windows/records are generated at the same time every hour, which burns down
>> FlinkKinesisProducer and the underlying KPL, but nothing is generated in
>> the rest of that hour. The pattern is like this:
>>
>> load
>> |
>> |/\  /\
>> |   /  \/  \
>> |_/_  \___/__\_
>>  time
>>
>>  Is there any way to even out the number of generated windows/records in
>> an hour? Can we have evenly distributed generated load like this?
>>
>>  load
>> |
>> |
>> | 
>> |___
>>  time
>>
>>
>> Thanks,
>> Bowen
>>
>> On Tue, Aug 22, 2017 at 2:56 AM, Till Rohrmann 
>> wrote:
>>
>>> Hi Bowen,
>>>
>>> sorry for my late answer. I dug through some of the logs and it seems
>>> that you have the following problem:
>>>
>>>1.
>>>
>>>Once in a while the Kinesis producer fails with a
>>>UserRecordFailedException saying “Expired while waiting in HttpClient 
>>> queue
>>>Record has reached expiration”. This seems to be a problem on the Kinesis
>>>side. This will trigger the task failure and the cancellation of all 
>>> other
>>>tasks as well.
>>>2.
>>>
>>>Somehow Flink does not manage to cancel all tasks within a period of
>>>180 seconds. This value is con

Even out the number of generated windows

2017-08-24 Thread Bowen Li
Hi guys,

I do have a question for how Flink generates windows.

We are using a 1-day sized sliding window with 1-hour slide to count some
features of items based on event time. We have about 20million items. We
observed that Flink only emit results on a fixed time in an hour (e.g. 1am,
2am, 3am,  or 1:15am, 2:15am, 3:15am with a 15min offset). That's means
20million windows/records are generated at the same time every hour, which
burns down our sink. But nothing is generated in the rest of that hour. The
pattern is like this:

# generated windows
|
|/\  /\
|   /  \/  \
|_/__\___/__\_
 time

Is there any way to even out the number of generated windows/records in an
hour? Can we have evenly distributed generated load like this?

# generated windows
|
|
| 
|___
 time

Thanks,
Bowen


Re: Flink doesn't free YARN slots after restarting

2017-08-24 Thread Bowen Li
Hi Till,

Thank you very much for looking into it! According to our investigation,
this is indeed a Kinesis issue. Flink (FlinkKinesisProducer) uses
KPL(Kinesis Producer Library), but hasn't tune it up yet. I have identified
a bunch of issues, opened the following Flink tickets, and are working on
them.


   - [FLINK-7367][kinesis connector] Parameterize more configs for
   FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections,
   RequestTimeout, etc)
   - [FLINK-7366][kinesis connector] Upgrade kinesis producer library in
   flink-connector-kinesis
   - [FLINK-7508] switch FlinkKinesisProducer to use KPL's ThreadingMode to
   ThreadedPool mode rather than Per_Request mode


I do have a question for Flink performance. We are using a 1-day sized
sliding window with 1-hour slide to count some features of items based on
event time. We have about 20million items. We observed that Flink only emit
results on a fixed time in an hour (e.g. 1am, 2am, 3am,  or 1:15am, 2:15am,
3:15am with a 15min offset). That's means 20million windows/records are
generated at the same time every hour, which burns down
FlinkKinesisProducer and the underlying KPL, but nothing is generated in
the rest of that hour. The pattern is like this:

load
|
|/\  /\
|   /  \/  \
|_/_  \___/__\_
 time

 Is there any way to even out the number of generated windows/records in an
hour? Can we have evenly distributed generated load like this?

 load
|
|
| 
|___
 time


Thanks,
Bowen

On Tue, Aug 22, 2017 at 2:56 AM, Till Rohrmann  wrote:

> Hi Bowen,
>
> sorry for my late answer. I dug through some of the logs and it seems that
> you have the following problem:
>
>1.
>
>Once in a while the Kinesis producer fails with a
>UserRecordFailedException saying “Expired while waiting in HttpClient queue
>Record has reached expiration”. This seems to be a problem on the Kinesis
>side. This will trigger the task failure and the cancellation of all other
>tasks as well.
>2.
>
>Somehow Flink does not manage to cancel all tasks within a period of
>180 seconds. This value is configurable via task.cancellation.timeout
>(unit ms) via the Flink configuration. It looks a bit like you have a lot
>of logging going on, because the the code is waiting for example on
>Category.java:204 and other log4j methods. This could, however also cover
>the true issue. What you could do is to try out a different logging backend
>such as logback [1], for example.
>3.
>
>The failing cancellation is a fatal error which leads to the
>termination of the TaskManager. This will be notified by the
>YarnResourceManager and it will restart the container. This goes on until
>it reaches the number of maximum failed containers. This value can be
>configured via yarn.maximum-failed-containers. Per default it is the
>number of initial containers you requested. If you set this value to -1,
>then it will never fail and always restart failed containers. Once the
>maximum is reached, Flink terminates the Yarn application.
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_
> practices.html#using-logback-instead-of-log4j
>
> In order to further debug the problem, which version of Flink are you
> using and maybe you could provide us with the debug log level logs of the
> TaskManagers.
>
> Cheers,
> Till
> ​
>
> On Fri, Aug 11, 2017 at 5:37 AM, Bowen Li  wrote:
>
>> Hi Till,
>> Any idea why it happened? I've tried different configurations for
>> configuring our Flink cluster, but the cluster always fails after 4 or 5
>> hours.
>>
>> According to the log, looks like the total number of slots becomes 0
>> at the end, and YarnClusterClient shuts down application master as a
>> result. Why the slots are not released? Or are they actually crushed and
>> thus no longer available?
>>
>> I'm trying to deploy the first Flink cluster within out company. And this
>> issue is slowing us down from proving that Flink actually works for us.
>> We'd appreciate your help on it!
>>
>> Thanks,
>> Bowen
>>
>> On Wed, Aug 9, 2017 at 1:33 PM, Bowen Li  wrote:
>>
>>> Hi Till,
>>> Thanks for taking this issue.
>>>
>>> We are not comfortable sending logs to a email list which is this
>>> open. I'll send logs to you.
>>>
>>> Thanks,
>>> Bowen
>>>
>>>
>>> On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Bowen,
>>>>

Re: akka timeout

2017-08-23 Thread Bowen Li
Hi Steven,
Yes, GC is a big overhead, it may cause your CPU utilization to reach
100%, and every process stopped working. We ran into this a while too.

How much memory did you assign to TaskManager? How much the your CPU
utilization when your taskmanager is considered 'killed'?

Bowen



On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu  wrote:

> Till,
>
> Once our job was restarted for some reason (e.g. taskmangaer container got
> killed), it can stuck in continuous restart loop for hours. Right now, I
> suspect it is caused by GC pause during restart, our job has very high
> memory allocation in steady state. High GC pause then caused akka timeout,
> which then caused jobmanager to think taksmanager containers are
> unhealthy/dead and kill them. And the cycle repeats...
>
> But I hasn't been able to prove or disprove it yet. When I was asking the
> question, I was still sifting through metrics and error logs.
>
> Thanks,
> Steven
>
>
> On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann 
> wrote:
>
>> Hi Steven,
>>
>> quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up
>> the right timeout value from the configuration. Instead it uses a hardcoded
>> 10s timeout. This has only been changed recently and is already committed
>> in the master. So with the next release 1.4 it will properly pick up the
>> right timeout settings.
>>
>> Just out of curiosity, what's the instability issue you're observing?
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu  wrote:
>>
>>> Till/Chesnay, thanks for the answers. Look like this is a result/symptom
>>> of underline stability issue that I am trying to track down.
>>>
>>> It is Flink 1.2.
>>>
>>> On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler 
>>> wrote:
>>>
 The MetricFetcher always use the default akka timeout value.


 On 18.08.2017 09:07, Till Rohrmann wrote:

 Hi Steven,

 I thought that the MetricFetcher picks up the right timeout from the
 configuration. Which version of Flink are you using?

 The timeout is not a critical problem for the job health.

 Cheers,
 Till

 On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu 
 wrote:

>
> We have set akka.ask.timeout to 60 s in yaml file. I also confirmed
> the setting in Flink UI. But I saw akka timeout of 10 s for metric query
> service. two questions
> 1) why doesn't metric query use the 60 s value configured in yaml
> file? does it always use default 10 s value?
> 2) could this cause heartbeat failure between task manager and job
> manager? or is this jut non-critical failure that won't affect job health?
>
> Thanks,
> Steven
>
> 2017-08-17 23:34:33,421 WARN 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher
> - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask
> timed out on [Actor[akka.tcp://flink@1.2.3.4
> :39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]]
> after [1 ms] at akka.pattern.PromiseActorRef$$
> anonfun$1.apply$mcV$sp(AskSupport.scala:334) at
> akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
> at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:748)
>



>>>
>>
>


Re: [Survey] How many people use Flink with AWS Kinesis sink

2017-08-21 Thread Bowen Li
Hi Stephan,

It's just Kinesis Producer in KPL (Kinesis Producer Library) causing LOTS
of trouble. flink-connector-kinesis uses Kinesis Producer to write output
results to Kinesis. On the other hand, Kinesis Consumer (KCL) is fine.

If there are any successful use cases of Flink + KPL, I'd love to learn 1)
what KPL configuration values (rate limit, record_ttl, etc) are the best
for Flink 2) what the deployment strategy of KPL (parallelism, any
dedicated cores or memory?) works best with Flink. Thanks!

Bowen


On Mon, Aug 21, 2017 at 10:55 AM, Stephan Ewen  wrote:

> Hi!
>
> I cannot speak for the full survey, only from observation on the mailing
> list and some users I have chatted to directly.
> I do not really know about the Kinesis Producer (don't know a specific
> case there), but the Kinesis Consumer seems to be used quite a bit.
>
> Do your observations pertain to Kinesis Consumer as well, or mainly to the
> Kinesis Producer?
>
> Best,
> Stephan
>
>
> On Mon, Aug 21, 2017 at 8:29 AM, Bowen Li  wrote:
>
>> Hi guys,
>> We want to have a more accurate idea of how many people are writing
>> Flink's computation result to AWS Kinesis, and how many people had
>> successful Flink deployment against Kinesis?
>>
>> The reason I ask for the survey is because we have been trying to
>> make our Flink jobs and Kinesis sink work together for a long time but
>> haven't succeeded yet. We discovered quite a few issues with not only
>> Flink's flink-kinesis-connector but, most importantly, KPL (Kinesis
>> Producer Library) itself. Kinesis/KPL is poorly designed, we hate Kinesis,
>> and we are currently evaluating how much effort it further requires to make
>> Flink works with Kinesis.
>>
>> If not many Flink users had good experience with Kinesis, we'll
>> probably need to look for some alternatives.
>>
>> I really appreciate your time and your insight! Thank you very much!
>>
>> Bowen
>>
>>
>>
>


[Survey] How many people use Flink with AWS Kinesis sink

2017-08-20 Thread Bowen Li
Hi guys,
We want to have a more accurate idea of how many people are writing
Flink's computation result to AWS Kinesis, and how many people had
successful Flink deployment against Kinesis?

The reason I ask for the survey is because we have been trying to make
our Flink jobs and Kinesis sink work together for a long time but haven't
succeeded yet. We discovered quite a few issues with not only Flink's
flink-kinesis-connector but, most importantly, KPL (Kinesis Producer
Library) itself. Kinesis/KPL is poorly designed, we hate Kinesis, and we
are currently evaluating how much effort it further requires to make Flink
works with Kinesis.

If not many Flink users had good experience with Kinesis, we'll
probably need to look for some alternatives.

I really appreciate your time and your insight! Thank you very much!

Bowen


Which window function to use to start a window at anytime

2017-08-15 Thread Bowen Li
Hi guys,

We are trying use Flink to count millions of keyed items of an hour window
hourly as `time(SlidingEventTimeWindows.of(1hour, 1hour))`. According to
the sliding window doc
,
all windows are aligned with epoch and always start at integral hours like
1:00:00.000 - 1:59:59.999.

But we actually want to start an hour window whenever an new item arrives.
For example,
- for item A, the first event arrives at 1:12:24.123, so the window would
be 1:12:24.123 - 2:12:24.122, and the next window would be 2:12:24.123 -
3:12:24.122, and so on
- for item B, the first event arrives at 1:10:20:321, so the window would
be 1:10:20:321 - 2:10:20:320, and the next window would be 2:10:20:321 -
3:10:20:320, and so on.

Do you have any insights on how to achieve it? Thanks!

Bowen


Re: Flink doesn't free YARN slots after restarting

2017-08-10 Thread Bowen Li
Hi Till,
Any idea why it happened? I've tried different configurations for
configuring our Flink cluster, but the cluster always fails after 4 or 5
hours.

According to the log, looks like the total number of slots becomes 0 at
the end, and YarnClusterClient shuts down application master as a result. Why
the slots are not released? Or are they actually crushed and thus no longer
available?

I'm trying to deploy the first Flink cluster within out company. And this
issue is slowing us down from proving that Flink actually works for us.
We'd appreciate your help on it!

Thanks,
Bowen

On Wed, Aug 9, 2017 at 1:33 PM, Bowen Li  wrote:

> Hi Till,
> Thanks for taking this issue.
>
> We are not comfortable sending logs to a email list which is this
> open. I'll send logs to you.
>
> Thanks,
> Bowen
>
>
> On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann 
> wrote:
>
>> Hi Bowen,
>>
>> if I'm not mistaken, then Flink's current Yarn implementation does not
>> actively releases containers. The `YarnFlinkResourceManager` is started
>> with a fixed number of containers it always tries to acquire. If a
>> container should die, then it will request a new one.
>>
>> In case of a failure all slots should be freed and then they should be
>> subject to rescheduling the new tasks. Thus, it is not necessarily the case
>> that 12 new slots will be used unless the old slots are no longer available
>> (failure of a TM). Therefore, it sounds like a bug what you are describing.
>> Could you share the logs with us?
>>
>> Cheers,
>> Till
>>
>> On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li  wrote:
>>
>>> Hi guys,
>>> I was running a Flink job (12 parallelism) on an EMR cluster with 48
>>> YARN slots. When the job starts, I can see from Flink UI that the job took
>>> 12 slots, and 36 slots were left available.
>>>
>>> I would expect that when the job fails, it would restart from
>>> checkpointing by taking another 12 slots and freeing the original 12 slots. 
>>> *Well,
>>> I observed that the job took new slots but never free original slots. The
>>> Flink job ended up killed by YARN because there's no available slots
>>> anymore.*
>>>
>>>  Here's the command I ran Flink job:
>>>
>>>  ```
>>>  flink run -m yarn-cluster -yn 6 -ys 8 -ytm 4  xxx.jar
>>>  ```
>>>
>>>  Does anyone know what's going wrong?
>>>
>>> Thanks,
>>> Bowen
>>>
>>
>>
>


Re: Flink doesn't free YARN slots after restarting

2017-08-09 Thread Bowen Li
Hi Till,
Thanks for taking this issue.

We are not comfortable sending logs to a email list which is this open.
I'll send logs to you.

Thanks,
Bowen


On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann  wrote:

> Hi Bowen,
>
> if I'm not mistaken, then Flink's current Yarn implementation does not
> actively releases containers. The `YarnFlinkResourceManager` is started
> with a fixed number of containers it always tries to acquire. If a
> container should die, then it will request a new one.
>
> In case of a failure all slots should be freed and then they should be
> subject to rescheduling the new tasks. Thus, it is not necessarily the case
> that 12 new slots will be used unless the old slots are no longer available
> (failure of a TM). Therefore, it sounds like a bug what you are describing.
> Could you share the logs with us?
>
> Cheers,
> Till
>
> On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li  wrote:
>
>> Hi guys,
>> I was running a Flink job (12 parallelism) on an EMR cluster with 48
>> YARN slots. When the job starts, I can see from Flink UI that the job took
>> 12 slots, and 36 slots were left available.
>>
>> I would expect that when the job fails, it would restart from
>> checkpointing by taking another 12 slots and freeing the original 12 slots. 
>> *Well,
>> I observed that the job took new slots but never free original slots. The
>> Flink job ended up killed by YARN because there's no available slots
>> anymore.*
>>
>>  Here's the command I ran Flink job:
>>
>>  ```
>>  flink run -m yarn-cluster -yn 6 -ys 8 -ytm 4  xxx.jar
>>  ```
>>
>>  Does anyone know what's going wrong?
>>
>> Thanks,
>> Bowen
>>
>
>


Flink doesn't free YARN slots after restarting

2017-08-09 Thread Bowen Li
Hi guys,
I was running a Flink job (12 parallelism) on an EMR cluster with 48
YARN slots. When the job starts, I can see from Flink UI that the job took
12 slots, and 36 slots were left available.

I would expect that when the job fails, it would restart from
checkpointing by taking another 12 slots and freeing the original 12
slots. *Well,
I observed that the job took new slots but never free original slots. The
Flink job ended up killed by YARN because there's no available slots
anymore.*

 Here's the command I ran Flink job:

 ```
 flink run -m yarn-cluster -yn 6 -ys 8 -ytm 4  xxx.jar
 ```

 Does anyone know what's going wrong?

Thanks,
Bowen


Re: S3 recovery and checkpoint directories exhibit explosive growth

2017-07-25 Thread Bowen Li
Hi Stephan,
Making Flink's S3 integration independent of Hadoop is great. We've
been running into a lot of Hadoop configuration trouble when trying to
enabling Flink checkpointing with S3 on AWS EMR.

Is there any concrete plan or tickets created yet for tracking?

Thanks,
Bowen


On Mon, Jul 24, 2017 at 11:12 AM, Stephan Ewen  wrote:

> Hi Prashant!
>
> Flink's S3 integration currently goes through Hadoop's S3 file system (as
> you probably noticed).
>
> It seems that the Hadoop's S3 file system is not really well suited for
> what we want to do, and we are looking to drop it and replace it by
> something direct (independent of Hadoop) in the coming release...
>
> One essential thing to make sure is to not have the "trash" activated in
> the configuration, as it adds very high overhead to the delete operations.
>
> Best,
> Stephan
>
>
> On Mon, Jul 24, 2017 at 7:56 PM, Stephan Ewen  wrote:
>
>> Hi Prashant!
>>
>> I assume you are using Flink 1.3.0 or 1.3.1?
>>
>> Here are some things you can do:
>>
>>   - I would try and disable the incremental checkpointing for a start
>> and see what happens then. That should reduce the number of files already.
>>
>>   - Is it possible for you to run a patched version of Flink? If yes, can
>> you try to do the following: In the class "FileStateHandle", in the method
>> "discardState()", remove the code around "FileUtils.deletePathIfEmpty(...)"
>> - this is probably not working well when hitting too many S3 files.
>>
>>   -  You can delete old "completedCheckpointXXXYYY" files, but please do
>> not delete the other two types, they are needed for HA recovery.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Mon, Jul 24, 2017 at 3:46 AM, prashantnayak <
>> prash...@intellifylearning.com> wrote:
>>
>>> Hi Xiaogang and Stephan
>>>
>>> We're continuing to test and have now set up the cluster to disable
>>> incremental RocksDB checkpointing as well as increasing the checkpoint
>>> interval from 30s to 120s  (not ideal really :-( )
>>>
>>> We'll run it with a large number of jobs and report back if this setup
>>> shows
>>> improvement.
>>>
>>> Appreciate any another insights you might have around this problem.
>>>
>>> Thanks
>>> Prashant
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-flink-user-maili
>>> ng-list-archive.2336050.n4.nabble.com/S3-recovery-and-checkp
>>> oint-directories-exhibit-explosive-growth-tp14270p14392.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>>
>


Re: [POLL] Who still uses Java 7 with Flink ?

2017-07-12 Thread Bowen Li
shading / dependency issues (Akka 2.4 will
> >> > remove
> >> > > > Akka's protobuf dependency [2][3]) and generally Java 8's new
> >> language
> >> > > > features all speak for dropping Java 7.
> >> > > > >
> >> > > > > Java 8 has been released in March, 2014. Java 7 is unsupported
> >> since
> >> > > > June 2016.
> >> > > > >
> >> > > > > So what's the feeling in the community regarding the step?
> >> > > > >
> >> > > > >
> >> > > > > [1] https://issues.apache.org/jira/browse/FLINK-5005# <
> >> > > > https://issues.apache.org/jira/browse/FLINK-5005#>
> >> > > > > [2] https://issues.apache.org/jira/browse/FLINK-5989 <
> >> > > > https://issues.apache.org/jira/browse/FLINK-5989>
> >> > > > > [3]
> >> > > > https://issues.apache.org/jira/browse/FLINK-3211?
> >> > > focusedCommentId=15274018&page=com.atlassian.jira.
> >> > > plugin.system.issuetabpanels:comment-tabpanel#comment-15274018
> >> > > > <
> >> > > > https://issues.apache.org/jira/browse/FLINK-3211?
> >> > > focusedCommentId=15274018&page=com.atlassian.jira.
> >> > > plugin.system.issuetabpanels:comment-tabpanel#comment-15274018
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > On Thu, Mar 23, 2017 at 2:42 PM, Theodore Vasiloudis <
> >> > > > theodoros.vasilou...@gmail.com  >> theodoros.vasilou...@gmail.com
> >> > >>
> >> > > > wrote:
> >> > > > > Hello all,
> >> > > > >
> >> > > > > I'm sure you've considered this already, but what this data does
> >> not
> >> > > > include is all the potential future users,
> >> > > > > i.e. slower moving organizations (banks etc.) which could be on
> >> Java
> >> > 7
> >> > > > still.
> >> > > > >
> >> > > > > Whether those are relevant is up for debate.
> >> > > > >
> >> > > > > Cheers,
> >> > > > > Theo
> >> > > > >
> >> > > > > On Thu, Mar 23, 2017 at 12:14 PM, Robert Metzger <
> >> > rmetz...@apache.org
> >> > > > <mailto:rmetz...@apache.org>> wrote:
> >> > > > > Yeah, you are right :)
> >> > > > > I'll put something in my calendar for end of May.
> >> > > > >
> >> > > > > On Thu, Mar 23, 2017 at 12:12 PM, Greg Hogan <
> c...@greghogan.com
> >> > > > <mailto:c...@greghogan.com>> wrote:
> >> > > > > Robert,
> >> > > > >
> >> > > > > Thanks for the report. Shouldn’t we be revisiting this decision
> at
> >> > the
> >> > > > beginning of the new release cycle rather than near the end? There
> >> is
> >> > > > currently little cost to staying with Java 7 since no Flink code
> or
> >> > pull
> >> > > > requests have been written for Java 8.
> >> > > > >
> >> > > > > Greg
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >> On Mar 23, 2017, at 6:37 AM, Robert Metzger <
> rmetz...@apache.org
> >> > > > <mailto:rmetz...@apache.org>> wrote:
> >> > > > >>
> >> > > > >> Looks like 9% on twitter and 24% on the mailing list are still
> >> using
> >> > > > Java 7.
> >> > > > >>
> >> > > > >> I would vote to keep supporting Java 7 for Flink 1.3 and then
> >> > revisit
> >> > > > once we are approaching 1.4 in September.
> >> > > > >>
> >> > > > >> On Thu, Mar 16, 2017 at 8:00 AM, Bowen Li <
> >> bowen...@offerupnow.com
> >> > > > <mailto:bowen...@offerupnow.com>> wrote:
> >> > > > >> There's always a tradeoff we need to make. I'm in favor of
> >> upgrading
> >> > > to
> >> > > > Java 8 to bring in all new Java features.
> >> > > > >>
> >> > > > >> The common way I've seen (and I agree) other software upgrading
> >> > major
> >> > > > things like this is 1) upgrade for next big release without
> backward
> >> > > > compatibility and notify everyone 2) maintain and patch current,
> >> > old-tech
> >> > > > compatible version at a reasonably limited scope. Building
> backward
> >> > > > compatibility is too much for an open sourced project
> >> > > > >>
> >> > > > >>
> >> > > > >>
> >> > > > >> On Wed, Mar 15, 2017 at 7:10 AM, Robert Metzger <
> >> > rmetz...@apache.org
> >> > > > <mailto:rmetz...@apache.org>> wrote:
> >> > > > >> I've put it also on our Twitter account:
> >> > > > >> https://twitter.com/ApacheFlink/status/842015062667755521 <
> >> > > > https://twitter.com/ApacheFlink/status/842015062667755521>
> >> > > > >>
> >> > > > >> On Wed, Mar 15, 2017 at 2:19 PM, Martin Neumann <
> >> > martin.neum...@ri.se
> >> > > > <mailto:martin.neum...@ri.se>>
> >> > > > >> wrote:
> >> > > > >>
> >> > > > >> > I think this easier done in a straw poll than in an email
> >> > > > conversation.
> >> > > > >> > I created one at: http://www.strawpoll.me/12535073 <
> >> > > > http://www.strawpoll.me/12535073>
> >> > > > >> > (Note that you have multiple choices.)
> >> > > > >> >
> >> > > > >> >
> >> > > > >> > Though I prefer Java 8 most of the time I have to work on
> Java
> >> 7.
> >> > A
> >> > > > lot of
> >> > > > >> > the infrastructure I work on still runs Java 7, one of the
> >> > > companies I
> >> > > > >> > build a prototype for a while back just updated to Java 7 2
> >> years
> >> > > > ago. I
> >> > > > >> > doubt we can ditch Java 7 support any time soon if we want to
> >> make
> >> > > it
> >> > > > easy
> >> > > > >> > for companies to use Flink.
> >> > > > >> >
> >> > > > >> > cheers Martin
> >> > > > >> >
> >> > > > >> > //PS sorry if this gets sent twice, we just migrated to a new
> >> mail
> >> > > > system
> >> > > > >> > and a lot of things are broken
> >> > > > >> >
> >> > > > >> > 
> >> > > > >> > From: Stephan Ewen  se...@apache.org
> >> >>
> >> > > > >> > Sent: Wednesday, March 15, 2017 12:30:24 PM
> >> > > > >> > To: user@flink.apache.org <mailto:user@flink.apache.org>;
> >> > > > d...@flink.apache.org <mailto:d...@flink.apache.org>
> >> > > > >> > Subject: [POLL] Who still uses Java 7 with Flink ?
> >> > > > >> >
> >> > > > >> > Hi all!
> >> > > > >> >
> >> > > > >> > I would like to get a feeling how much Java 7 is still being
> >> used
> >> > > > among
> >> > > > >> > Flink users.
> >> > > > >> >
> >> > > > >> > At some point, it would be great to drop Java 7 support and
> >> make
> >> > use
> >> > > > of
> >> > > > >> > Java 8's new features, but first we would need to get a
> feeling
> >> > how
> >> > > > much
> >> > > > >> > Java 7 is still used.
> >> > > > >> >
> >> > > > >> > Would be happy if users on Java 7 respond here, or even users
> >> that
> >> > > > have
> >> > > > >> > some insights into how widespread they think Java 7 still is.
> >> > > > >> >
> >> > > > >> > Thanks,
> >> > > > >> > Stephan
> >> > > > >> >
> >> > > > >> >
> >> > > > >> >
> >> > > > >> >
> >> > > > >> >
> >> > > > >>
> >> > > > >>
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>


Re: confusing RocksDBStateBackend parameters

2017-06-18 Thread Bowen Li
Thanks for your clarification, Ziyad! I will try it out.

On Sat, Jun 17, 2017 at 3:45 PM, Ziyad Muhammed  wrote:

> Hi,
>
> To set the rocksdb state, you have two options:
>
> 1. Set the default state of the flink cluster, using the below parameters
> in flink-conf.yaml file
>
> state.backend: rocksdb
>
> state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints
>
>
> 2. Set a per job state backend (which overrides the default setting)
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.setStateBackend(new 
> RocksDBStateBackend(parameterTool.getRequired("checkpointDataUri")));
>
> Here you specify the path (for example, an HDFS directory like
> hdfs://namenode:40010/flink/checkpoints) as the checkpointDataURI.
>
> Hope that helps.
>
> Best
> Ziyad
>
> Best Regards
> *Ziyad Muhammed Mohiyudheen *
> 407, Internationales Studienzentrum Berlin
> Theodor-Heuss-Platz 5
> 14052 Berlin
> *Ph: +49 176 6587 3343 <%2B49%20176%206587%203343>*
> *Mail to*: *mmzi...@gmail.com *
>
> On Fri, Jun 16, 2017 at 8:20 PM, Bowen Li  wrote:
>
>> Hello guys,
>>   I've been trying to figure out differences among several parameters
>> of RocksDBStateBackend. The confusing parameters are:
>>
>>   In flink-conf.yaml:
>>   1. state.backend.fs.checkpointdir
>>   2. state.backend.rocksdb.checkpointdir
>>   3. state.checkpoints.dir
>>
>>   and
>>4. the param *'**checkpointDataUri**'* you pass in to
>> RocksDBStateBackend
>> constructor in`public RocksDBStateBackend(*URI checkpointDataUri*)`
>>
>> This email thread
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-on-state-backend-parameters-td11419.html>
>> explained the first three well. But what's the 4th one for? What's its
>> difference from others? I'd appreciate your clarification.
>>
>>Thanks very much!
>> Bowen
>>
>>
>>
>
>


confusing RocksDBStateBackend parameters

2017-06-16 Thread Bowen Li
Hello guys,
  I've been trying to figure out differences among several parameters
of RocksDBStateBackend. The confusing parameters are:

  In flink-conf.yaml:
  1. state.backend.fs.checkpointdir
  2. state.backend.rocksdb.checkpointdir
  3. state.checkpoints.dir

  and
   4. the param *'**checkpointDataUri**'* you pass in to
RocksDBStateBackend
constructor in`public RocksDBStateBackend(*URI checkpointDataUri*)`

This email thread

explained the first three well. But what's the 4th one for? What's its
difference from others? I'd appreciate your clarification.

   Thanks very much!
Bowen


Re: Clarification on state backend parameters

2017-06-14 Thread Bowen Li
FYI,
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-on-state-backend-parameters-td11419.html
here's the context that discussed differences among:

state.backend.fs.checkpointdir
state.backend.rocksdb.checkpointdir
state.checkpoints.dir

On Wed, Jun 14, 2017 at 12:20 PM, bowen.li  wrote:

> Hi guys,
> This is great clarification!
>
> An extended question from me is, what's the difference between
> `state.checkpoints.dir` and the param you pass in to RocksDBStateBackend
> constructor in`public RocksDBStateBackend(URI checkpointDataUri) throws
> IOException`? They are really confusing.
>
> I specified checkpointDataUri but got error of `CheckpointConfig says
> to
> persist periodic checkpoints, but no checkpoint directory has been
> configured. You can configure configure one via key
> 'state.checkpoints.dir'.`.
>
> Thanks,
> Bowen
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Clarification-
> on-state-backend-parameters-tp11419p13744.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: [POLL] Who still uses Java 7 with Flink ?

2017-03-16 Thread Bowen Li
There's always a tradeoff we need to make. I'm in favor of upgrading to
Java 8 to bring in all new Java features.

The common way I've seen (and I agree) other software upgrading major
things like this is 1) upgrade for next big release without backward
compatibility and notify everyone 2) maintain and patch current, old-tech
compatible version at a reasonably limited scope. Building backward
compatibility is too much for an open sourced project



On Wed, Mar 15, 2017 at 7:10 AM, Robert Metzger  wrote:

> I've put it also on our Twitter account:
> https://twitter.com/ApacheFlink/status/842015062667755521
>
> On Wed, Mar 15, 2017 at 2:19 PM, Martin Neumann 
> wrote:
>
> > I think this easier done in a straw poll than in an email conversation.
> > I created one at: http://www.strawpoll.me/12535073
> > (Note that you have multiple choices.)
> >
> >
> > Though I prefer Java 8 most of the time I have to work on Java 7. A lot
> of
> > the infrastructure I work on still runs Java 7, one of the companies I
> > build a prototype for a while back just updated to Java 7 2 years ago. I
> > doubt we can ditch Java 7 support any time soon if we want to make it
> easy
> > for companies to use Flink.
> >
> > cheers Martin
> >
> > //PS sorry if this gets sent twice, we just migrated to a new mail system
> > and a lot of things are broken
> >
> > 
> > From: Stephan Ewen 
> > Sent: Wednesday, March 15, 2017 12:30:24 PM
> > To: user@flink.apache.org; d...@flink.apache.org
> > Subject: [POLL] Who still uses Java 7 with Flink ?
> >
> > Hi all!
> >
> > I would like to get a feeling how much Java 7 is still being used among
> > Flink users.
> >
> > At some point, it would be great to drop Java 7 support and make use of
> > Java 8's new features, but first we would need to get a feeling how much
> > Java 7 is still used.
> >
> > Would be happy if users on Java 7 respond here, or even users that have
> > some insights into how widespread they think Java 7 still is.
> >
> > Thanks,
> > Stephan
> >
> >
> >
> >
> >
>


How to register and send custom metrics from Flink?

2017-03-13 Thread Bowen Li
Hi guys,
I'm retrying to send some app related custom metrics from Flink to
Datadog via StatsD.

I followed https://ci.apache.org/projects/flink/flink-docs-
release-1.2/monitoring/metrics.html to set up flink-conf.yaml and test code
like this

// flink-conf.yaml

metrics.reporters: stsd
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125

metrics.scope.jm: bowen.jobmanager
metrics.scope.jm.job: bowen.jobmanager.bowen.job_name
metrics.scope.tm: bowen.taskmanager.

metrics.scope.tm.job: bowen.taskmanager..
metrics.scope.task: bowen.taskmanager<
subtask_index>
metrics.scope.operator: bowen.taskmanager..<
job_name>..


  // test code, by modifying WordCount example


public static final class Tokenizer extends
RichFlatMapFunction> {
   private static final long serialVersionUID = 1L;

   private Counter counter;

   @Override
   public void open(Configuration config) {
  getRuntimeContext()
 .getMetricGroup()
 .addGroup("bowen.test")
 .gauge("bowen.test.flink", new Gauge() {
@Override
public Integer getValue() {
   return 100;
}
 });  // test custom metrics

  counter = getRuntimeContext()
 .getMetricGroup()
 .addGroup("bowen.test")
 .counter("bowen.flink.metric"); // test custom metrics
  counter.inc(100);
   }

   @Override
   public void flatMap(String value, Collector> out)
 throws Exception {
  // normalize and split the line
  String[] tokens = value.toLowerCase().split("\\W+");

  // emit the pairs
  for (String token : tokens) {
 if (token.length() > 0) {
out.collect(new Tuple2(token, 1));
 }
  }

  counter.inc(100);
   }
}

  I found my Datadog received all system scope metrics, but non of my
custom metric. I researched all night but gained no progress. What did I do
wrong? Flink is able to handle custom metrics right? I'd really appreciate
some guidance on sending custom metrics!

 Thank you very much!
Bowen