Re: Modelling time for complex events generated out of simple ones

2020-04-19 Thread Salva Alcántara
In my case, the relationship between input and output events is that output
events are generated out of some rules based on input events. Essentially,
output events correspond to specific patterns / sequences of input events.
You can think of output events as detecting certain anomalies or abnormal
conditions. So I guess we are more in the second case you mention where the
Flink TM can be regarded as a generator and hence using the processing time
makes sense.

Indeed, I am using both the processing time and the event time watermark
value at the moment of generating the output events. I think both convey
useful information. In particular, the processing time looks as the logical
timestamp for the output events. However, although that would be an
exception, it might also happen that my flink app is processing old data at
some point. That is why I am also adding another timestamp with the current
event-time watermark value. This allows the consumer of the output events to
detect whether the output event corresponds to old data or not (by comparing
the difference between the processing time and event time timestamps, which
should in normal conditions be close to each other, except when processing
old data).

In the case of using both, what naming would you use for the two fields?
Something along the lines of event_time and processing_time seems to leak
implementation details of my app to the external services...



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-19 Thread Yang Wang
Hi tison,

I think i get your concerns and points.

Take both FLINK-13938[1] and FLINK-14964[2] into account, i will do in the
following steps.
* Enrich "-yt/--yarnship" to support HDFS directory
* Enrich "-yt/--yarnship" to specify local resource visibility. It is
"APPLICATION" by default. It could be also configured to "PUBLIC",
which means shared by all applications, or "PRIVATE" which means shared by
a same user.
* Add a new config option to control whether to optimize the
submission(default is false). When configured to true, Flink client will
try to filter the jars and files by name and size to avoid unnecessary
uploading.

A very rough submission command could be issued as following.
*./bin/flink run -m yarn-cluster -d -yt
hdfs://myhdfs/flink/release/flink-1.11:PUBLIC,hdfs://myhdfs/user/someone/mylib
\*
*-yD yarn.submission-optimization.enable=true
examples/streaming/WindowJoin.jar*

cc @Rong Rong , since you also help to review the old
PR of FLINK-13938, maybe you could also share some thoughts.


[1]. https://issues.apache.org/jira/browse/FLINK-13938
[2]. https://issues.apache.org/jira/browse/FLINK-14964


Best,
Yang



tison  于2020年4月18日周六 下午12:12写道:

> Hi Yang,
>
> Name filtering & schema special handling makes sense for me. We can enrich
> later if there is requirement without breaking interface.
>
> For #1, from my perspective your first proposal is
>
>   having an option specifies remote flink/lib, then we turn off auto
> uploading local flink/lib and register that path as local resources
>
> It seems we here add another special logic for handling one kind of
> things...what I propose is we do these two steps explicitly separated:
>
> 1. an option turns off auto uploading local flink/lib
> 2. a general option register remote files as local resources
>
> The rest thing here is that you propose we handle flink/lib as PUBLIC
> visibility while other files as APPLICATION visibility, whether a
> composite configuration or name filtering to special handle libs makes
> sense though.
>
> YarnClusterDescriptor already has a lot of special handling logics which
> introduce a number of config options and keys, which should
> have been configured in few of common options and validated at the runtime.
>
> Best,
> tison.
>
>
> Yang Wang  于2020年4月17日周五 下午11:42写道:
>
>> Hi tison,
>>
>> For #3, if you mean registering remote HDFS file as local resource, we
>> should make the "-yt/--yarnship"
>> to support remote directory. I think it is the right direction.
>>
>> For #1, if the users could ship remote directory, then they could also
>> specify like this
>> "-yt hdfs://hdpdev/flink/release/flink-1.x,
>> hdfs://hdpdev/user/someone/mylib". Do you mean we add an
>> option for whether trying to avoid unnecessary uploading? Maybe we could
>> filter by names and file size.
>> I think this is a good suggestion, and we do not need to introduce a new
>> config option "-ypl".
>>
>> For #2, for flink-dist, the #1 could already solve the problem. We do not
>> need to support remote schema.
>> It will confuse the users when we only support HDFS, not S3, OSS, etc.
>>
>>
>> Best,
>> Yang
>>
>> tison  于2020年4月17日周五 下午8:05写道:
>>
>>> Hi Yang,
>>>
>>> I agree that these two of works would benefit from single assignee. My
>>> concern is as below
>>>
>>> 1. Both share libs & remote flink dist/libs are remote ship files. I
>>> don't think we have to implement multiple codepath/configuration.
>>> 2. So, for concept clarification, there are
>>>   (1) an option to disable shipping local libs
>>>   (2) flink-dist supports multiple schema at least we said "hdfs://"
>>>   (3) an option for registering remote shipfiles with path & visibility.
>>> I think new configuration system helps.
>>>
>>> the reason we have to special handling (2) instead of including it in
>>> (3) is because when shipping flink-dist to TM container, we specially
>>> detect flink-dist. Of course we can merge it into general ship files and
>>> validate shipfiles finally contain flink-dist, which is an alternative.
>>>
>>> The *most important* difference is (1) and (3) which we don't have an
>>> option for only remote libs. Is this clarification satisfy your proposal?
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Till Rohrmann  于2020年4月17日周五 下午7:49写道:
>>>
 Hi Yang,

 from what I understand it sounds reasonable to me. Could you sync with
 Tison on FLINK-14964 on how to proceed. I'm not super deep into these
 issues but they seem to be somewhat related and Tison already did some
 implementation work.

 I'd say it be awesome if we could include this kind of improvement into
 the release.

 Cheers,
 Till

 On Thu, Apr 16, 2020 at 4:43 AM Yang Wang 
 wrote:

> Hi All, thanks a lot for reviving this discussion.
>
> I think we could unify the FLINK-13938 and FLINK-14964 since they have
> the similar
> purpose, avoid unnecessary uploading and downloading jars in YARN
> deployment.
> The 

Re:Re: multi-sql checkpoint fail

2020-04-19 Thread forideal
Hi Tison, Jark Wu:


   Thanks for your reply !!!


   What's the statebackend are you using? Is it Heap statebackend?
   
rocksdb backend uses incremental checkpoint.


   Could you share the stack traces?
 I looked at the flame chart myself and found that it was stuck at the end 
of the window where the calculation took place.
 
   I found that checkpoint failed because of watermark. For the same operator, 
the difference between max watermark and min watermark is 30 minutes (my 
checkpoint interval is 10 minutes). This may be caused by the slow calculation 
of windows.




Best forideal










At 2020-04-18 21:51:13, "Jark Wu"  wrote:

Hi,


What's the statebackend are you using? Is it Heap statebackend?

Best,
Jark


On Sat, 18 Apr 2020 at 07:06, tison  wrote:

Hi,


Could you share the stack traces?


Best,
tison.




forideal  于2020年4月18日周六 上午12:33写道:


Hello friend

I have two SQL, checkpoint fails all the time. One task is to open a sliding 
window for an hour, and then another task consumes the output data of the 
previous task. There will be no problem with the two tasks submitted separately.
-- first Calculation
-- second Write the calculation to redis
-- first
insertintodw_access_logselecttime_key,query_nor,query_nor_counter,'1'asgroup_keyfrom(selectHOP_START(event_time_fake,interval'1'MINUTE,interval'60'MINUTE)astime_key,query_nor,count(1)asquery_nor_counterfrom(selectRED_JSON_VALUE(request,'$.query_nor')asquery_nor,RED_JSON_VALUE(request,'$.target')astarget,event_time_fakefrom(selectred_pb_parser(body,'request')asrequest,event_time_fakefromaccess_log_source))groupbyquery_nor,HOP(--
 sliding window size one hour, step one minute
event_time_fake,interval'1'MINUTE,interval'60'MINUTE))wherequery_nor_counter>100;--
 second
insertintodw_sink_access_logselect'fix_key'as`key`,get_json_value(query_nor,query_nor_counter)as`value`--
 agg_func
fromdw_access_loggroupbytumble(time_key_fake,interval'1'MINUTE),group_key
Article Link:https://zhuanlan.zhihu.com/p/132764573
Picture Link:
https://pic4.zhimg.com/80/v2-d3b1105b1419fef3ea6b9176085a5597_1440w.jpg 
https://pic3.zhimg.com/80/v2-b6ea7b4a8368c4bae03afb94c723bcca_1440w.jpg


Best, forideal








 

Re: 问题请教-flinksql的kafkasource方面

2020-04-19 Thread Benchao Li
应该是不会的。分配不到partition的source会标记为idle状态。

Sun.Zhu <17626017...@163.com> 于2020年4月20日周一 上午10:28写道:

> Hi,benchao,source并发度大于partition数的话,会导致不做checkpoint的问题吧
>
>
>
>
> | |
> Sun.Zhu
> |
> |
> 邮箱:17626017...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年04月19日 22:43,人生若只如初见 写道:
> 嗯嗯,十分感谢
>
>
>
>
> --原始邮件--
>  发件人:"Benchao Li" 发送时间:2020年4月19日(星期天) 晚上9:25
> 收件人:"user-zh"
> 主题:Re: 问题请教-flinksql的kafkasource方面
>
>
>
> 如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。
>
> Jark Wu 
>  Hi,
> 
>  根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
>  根据你的 Java 代码,数据的 event time
> 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
>  能容忍 5s 乱序).
>  但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些
> partition
>  进度快很多的现象,
>  导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
> 
>  完美的解决方案还需要等 FLIP-27 的完成。
>  当前可以通过增加 watermark delay来增大迟到数据的容忍。
> 
>  Best,
>  Jark
> 
> 
>  On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  
>   你好
>  
>  
> 
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
>  
>  
>  
>   附:
>   userbehavior建表语句
>   CREATE TABLE user_behavior (
>   nbsp; nbsp; user_id BIGINT,
>   nbsp; nbsp; item_id BIGINT,
>   nbsp; nbsp; category_id BIGINT,
>   nbsp; nbsp; behavior STRING,
>   nbsp; nbsp; ts TIMESTAMP(3),
>   nbsp; nbsp; proctime as PROCTIME(), nbsp; --
> 通过计算列产生一个处理时间列
>   nbsp; nbsp; WATERMARK FOR ts as ts - INTERVAL '5'
> SECOND nbsp;--
>   在ts上定义watermark,ts成为事件时间列
>   ) WITH (
>   nbsp; nbsp; 'connector.type' = 'kafka', nbsp;--
> 使用 kafka connector
>   nbsp; nbsp; 'connector.version' = 'universal',
> nbsp;-- kafka
>   版本,universal 支持 0.11 以上的版本
>   nbsp; nbsp; 'connector.topic' = 'user_behavior',
> nbsp;-- kafka topic
>   nbsp; nbsp; 'connector.startup-mode' =
> 'earliest-offset', nbsp;-- 从起始
>   offset 开始读取
>   nbsp; nbsp; 'connector.properties.zookeeper.connect' =
> '
>   192.168.0.150:2181', nbsp;-- zookeeper 地址
>   nbsp; nbsp; 'connector.properties.bootstrap.servers' =
> '
>   192.168.0.150:9092', nbsp;-- kafka broker 地址
>   nbsp; nbsp; 'format.type' = 'json' nbsp;-- 数据源格式为
> json
>   )
>  
>   每小时购买数建表语句
>   CREATE TABLE buy_cnt_per_hour (nbsp;
>   nbsp; nbsp; hour_of_day BIGINT,
>   nbsp; nbsp; buy_cnt BIGINT
>   ) WITH (
>   nbsp; nbsp; 'connector.type' = 'elasticsearch', -- 使用
> elasticsearch
>   connector
>   nbsp; nbsp; 'connector.version' = '6', nbsp;--
> elasticsearch 版本,6 能支持
>   es 6+ 以及 7+ 的版本
>   nbsp; nbsp; 'connector.hosts' = '
> http://192.168.0.150:9200', nbsp;--
>   elasticsearch 地址
>   nbsp; nbsp; 'connector.index' = 'buy_cnt_per_hour',
> nbsp;--
>   elasticsearch 索引名,相当于数据库的表名
>   nbsp; nbsp; 'connector.document-type' =
> 'user_behavior', --
>   elasticsearch 的 type,相当于数据库的库名
>   nbsp; nbsp; 'connector.bulk-flush.max-actions' = '1',
> nbsp;-- 每条数据都刷新
>   nbsp; nbsp; 'format.type' = 'json', nbsp;--
> 输出数据格式 json
>   nbsp; nbsp; 'update-mode' = 'append'
>   )
>  
>   插入语句
>   INSERT INTO buy_cnt_per_hournbsp;
>   SELECT HOUR(TUMBLE_START(ts,INTERVAL '1'
> HOUR)),COUNT(*)nbsp;
>   FROM user_behavior
>   WHERE behavior = 'buy'
>   GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
>  
>   kafka数据发送代码
>  
>   import com.alibaba.fastjson.JSONObject;
>   import org.apache.kafka.clients.producer.KafkaProducer;
>   import org.apache.kafka.clients.producer.ProducerRecord;
>  
>   import java.text.SimpleDateFormat;
>   import java.util.*;
>  
>  
>   public class UserBehaviorProducer {
>   public static final String brokerList = "
> 192.168.0.150:9092";
>  
>   // public static final
> String topic="user_behavior";
>   public static final String topic =
> "user_behavior";
>  
>   public static void main(String args[]) {
>  
>   //配置生产者客户端参数
>   //将配置序列化
>   Properties
> properties = new Properties();
>  
> properties.put("key.serializer",
>   "org.apache.kafka.common.serialization.StringSerializer");
>  
> properties.put("value.serializer",
>   "org.apache.kafka.common.serialization.StringSerializer");
>  
> properties.put("bootstrap.servers", brokerList);
>  
> //创建KafkaProducer 实例
>  
> KafkaProducer   KafkaProducer   //构建待发送的消息
>   //{"user_id":
> "952483", "item_id":"310884", "category_id":
>   "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
>   //{"user_id":
> "794777", "item_id":"5119439", "category_id":
>   "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
>   String[]
> behaviors = {"pv", "buy", "coll",
>  "cart"};//浏览,购买,收藏,加入购物车
>   JSONObject
> jsonObject = new JSONObject();
>   HashMap Stringgt; info = new HashMap   Random random =
> new Random();
>   SimpleDateFormat
> format = new
>   SimpleDateFormat("-MM-dd'T'HH:mm:ss'Z'");
>   long
> date_long=getDate();
>   while (true) {
> 
> 
> jsonObject.put("user_id", random.nextInt(90) + 10 +
>   "");
> 
> 
> jsonObject.put("item_id", random.nextInt(90) + 10 +
>   "");
> 
> 
> jsonObject.put("category_id", random.nextInt(1000) + "");
> 
> 
> jsonObject.put("behavior", 

Re: how to send back result via job manager to client

2020-04-19 Thread Eleanore Jin
Hi Kurt,

谢谢, 我了解过后如果有问题再请教

Best
Eleanore

On Sun, Apr 19, 2020 at 7:18 PM Kurt Young  wrote:

> 可以看下这个jira:https://issues.apache.org/jira/browse/FLINK-14807
>
> Best,
> Kurt
>
>
> On Mon, Apr 20, 2020 at 7:07 AM Eleanore Jin 
> wrote:
>
> > Hi,
> > 刚刚读到一篇关于Flink 在OLAP 上的使用案例 (
> >
> https://ververica.cn/developers/olap-engine-performance-optimization-and-application-cases/
> ),
> > 其中一点提到了:
> > [image: image.png]
> > 这部分优化,源于 OLAP 的一个特性:OLAP 会将最终计算结果发给客户端,通过JobManager 转发给 Client。
> >
> > 想请问一下这一点是如何实现的: 通过JobManager 把结果转发给Client.
> >
> > 谢谢!
> > Eleanore
> >
>


??????????????-flinksql??kafkasource????

2020-04-19 Thread Sun.Zhu
Hi,benchao??source??partition??checkpoint




| |
Sun.Zhu
|
|
??17626017...@163.com
|

Signature is customized by Netease Mail Master

??2020??04??19?? 22:43 ??
??
 



----
 ??:"Benchao Li"

Re: Job manager URI rpc address:port

2020-04-19 Thread Jeff Zhang
Glad to hear that.

Som Lima  于2020年4月20日周一 上午8:08写道:

> I will thanks.  Once I had it set up and working.
> I switched  my computers around from client to server to server to client.
> With your excellent instructions I was able to do it in 5 .minutes
>
> On Mon, 20 Apr 2020, 00:05 Jeff Zhang,  wrote:
>
>> Som, Let us know when you have any problems
>>
>> Som Lima  于2020年4月20日周一 上午2:31写道:
>>
>>> Thanks for the info and links.
>>>
>>> I had a lot of problems I am not sure what I was doing wrong.
>>>
>>> May be conflicts with setup from apache spark.  I think I may need to
>>> setup users for each development.
>>>
>>>
>>> Anyway I kept doing fresh installs about four altogether I think.
>>>
>>> Everything works fine now
>>> Including remote access  of zeppelin on machines across the local area
>>> network.
>>>
>>> Next step  setup remote clusters
>>>  Wish me luck !
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sun, 19 Apr 2020, 14:58 Jeff Zhang,  wrote:
>>>
 Hi Som,

 You can take a look at flink on zeppelin, in zeppelin you can connect
 to a remote flink cluster via a few configuration, and you don't need to
 worry about the jars. Flink interpreter will ship necessary jars for you.
 Here's a list of tutorials.

 1) Get started https://link.medium.com/oppqD6dIg5
  2) Batch https://
 link.medium.com/3qumbwRIg5  3)
 Streaming https://link.medium.com/RBHa2lTIg5
  4) Advanced usage https://
 link.medium.com/CAekyoXIg5 


 Zahid Rahman  于2020年4月19日周日 下午7:27写道:

> Hi Tison,
>
> I think I may have found what I want in example 22.
>
> https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration
>
> I need to create Configuration object first as shown .
>
> Also I think  flink-conf.yaml file may contain configuration for
> client rather than  server. So before starting is irrelevant.
> I am going to play around and see but if the Configuration class
> allows me to set configuration programmatically and overrides the yaml 
> file
> then that would be great.
>
>
>
> On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:
>
>> Thanks.
>> flink-conf.yaml does allow me to do what I need to do without making
>> any changes to client source code.
>>
>> But
>> RemoteStreamEnvironment constructor  expects a jar file as the third
>> parameter also.
>>
>> RemoteStreamEnvironment
>> 
>> (String
>> 
>>  host,
>> int port, String
>> 
>> ... jarFiles)
>> Creates a new RemoteStreamEnvironment that points to the master
>> (JobManager) described by the given host name and port.
>>
>> On Sun, 19 Apr 2020, 11:02 tison,  wrote:
>>
>>> You can change flink-conf.yaml "jobmanager.address" or
>>> "jobmanager.port" options before run the program or take a look at
>>> RemoteStreamEnvironment which enables configuring host and port.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Som Lima  于2020年4月19日周日 下午5:58写道:
>>>
 Hi,

 After running

 $ ./bin/start-cluster.sh

 The following line of code defaults jobmanager  to localhost:6123

 final  ExecutionEnvironment env =
 Environment.getExecutionEnvironment();

 which is same on spark.

 val spark =
 SparkSession.builder.master(local[*]).appname("anapp").getOrCreate

 However if I wish to run the servers on a different physical
 computer.
 Then in Spark I can do it this way using the spark URI in my IDE.

 Conf =
 SparkConf().setMaster("spark://:").setAppName("anapp")

 Can you please tell me the equivalent change to make so I can run
 my servers and my IDE from different physical computers.














 --
 Best Regards

 Jeff Zhang

>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


Re: how to send back result via job manager to client

2020-04-19 Thread Kurt Young
可以看下这个jira:https://issues.apache.org/jira/browse/FLINK-14807

Best,
Kurt


On Mon, Apr 20, 2020 at 7:07 AM Eleanore Jin  wrote:

> Hi,
> 刚刚读到一篇关于Flink 在OLAP 上的使用案例 (
> https://ververica.cn/developers/olap-engine-performance-optimization-and-application-cases/),
> 其中一点提到了:
> [image: image.png]
> 这部分优化,源于 OLAP 的一个特性:OLAP 会将最终计算结果发给客户端,通过JobManager 转发给 Client。
>
> 想请问一下这一点是如何实现的: 通过JobManager 把结果转发给Client.
>
> 谢谢!
> Eleanore
>


Re: Job manager URI rpc address:port

2020-04-19 Thread Som Lima
I will thanks.  Once I had it set up and working.
I switched  my computers around from client to server to server to client.
With your excellent instructions I was able to do it in 5 .minutes

On Mon, 20 Apr 2020, 00:05 Jeff Zhang,  wrote:

> Som, Let us know when you have any problems
>
> Som Lima  于2020年4月20日周一 上午2:31写道:
>
>> Thanks for the info and links.
>>
>> I had a lot of problems I am not sure what I was doing wrong.
>>
>> May be conflicts with setup from apache spark.  I think I may need to
>> setup users for each development.
>>
>>
>> Anyway I kept doing fresh installs about four altogether I think.
>>
>> Everything works fine now
>> Including remote access  of zeppelin on machines across the local area
>> network.
>>
>> Next step  setup remote clusters
>>  Wish me luck !
>>
>>
>>
>>
>>
>>
>>
>> On Sun, 19 Apr 2020, 14:58 Jeff Zhang,  wrote:
>>
>>> Hi Som,
>>>
>>> You can take a look at flink on zeppelin, in zeppelin you can connect to
>>> a remote flink cluster via a few configuration, and you don't need to worry
>>> about the jars. Flink interpreter will ship necessary jars for you. Here's
>>> a list of tutorials.
>>>
>>> 1) Get started https://link.medium.com/oppqD6dIg5
>>>  2) Batch https://
>>> link.medium.com/3qumbwRIg5  3) Streaming
>>> https://link.medium.com/RBHa2lTIg5  4)
>>> Advanced usage https://link.medium.com/CAekyoXIg5
>>> 
>>>
>>>
>>> Zahid Rahman  于2020年4月19日周日 下午7:27写道:
>>>
 Hi Tison,

 I think I may have found what I want in example 22.

 https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration

 I need to create Configuration object first as shown .

 Also I think  flink-conf.yaml file may contain configuration for client
 rather than  server. So before starting is irrelevant.
 I am going to play around and see but if the Configuration class allows
 me to set configuration programmatically and overrides the yaml file then
 that would be great.



 On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:

> Thanks.
> flink-conf.yaml does allow me to do what I need to do without making
> any changes to client source code.
>
> But
> RemoteStreamEnvironment constructor  expects a jar file as the third
> parameter also.
>
> RemoteStreamEnvironment
> 
> (String
> 
>  host,
> int port, String
> 
> ... jarFiles)
> Creates a new RemoteStreamEnvironment that points to the master
> (JobManager) described by the given host name and port.
>
> On Sun, 19 Apr 2020, 11:02 tison,  wrote:
>
>> You can change flink-conf.yaml "jobmanager.address" or
>> "jobmanager.port" options before run the program or take a look at
>> RemoteStreamEnvironment which enables configuring host and port.
>>
>> Best,
>> tison.
>>
>>
>> Som Lima  于2020年4月19日周日 下午5:58写道:
>>
>>> Hi,
>>>
>>> After running
>>>
>>> $ ./bin/start-cluster.sh
>>>
>>> The following line of code defaults jobmanager  to localhost:6123
>>>
>>> final  ExecutionEnvironment env =
>>> Environment.getExecutionEnvironment();
>>>
>>> which is same on spark.
>>>
>>> val spark =
>>> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>>>
>>> However if I wish to run the servers on a different physical
>>> computer.
>>> Then in Spark I can do it this way using the spark URI in my IDE.
>>>
>>> Conf =
>>> SparkConf().setMaster("spark://:").setAppName("anapp")
>>>
>>> Can you please tell me the equivalent change to make so I can run my
>>> servers and my IDE from different physical computers.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Job manager URI rpc address:port

2020-04-19 Thread Jeff Zhang
Som, Let us know when you have any problems

Som Lima  于2020年4月20日周一 上午2:31写道:

> Thanks for the info and links.
>
> I had a lot of problems I am not sure what I was doing wrong.
>
> May be conflicts with setup from apache spark.  I think I may need to
> setup users for each development.
>
>
> Anyway I kept doing fresh installs about four altogether I think.
>
> Everything works fine now
> Including remote access  of zeppelin on machines across the local area
> network.
>
> Next step  setup remote clusters
>  Wish me luck !
>
>
>
>
>
>
>
> On Sun, 19 Apr 2020, 14:58 Jeff Zhang,  wrote:
>
>> Hi Som,
>>
>> You can take a look at flink on zeppelin, in zeppelin you can connect to
>> a remote flink cluster via a few configuration, and you don't need to worry
>> about the jars. Flink interpreter will ship necessary jars for you. Here's
>> a list of tutorials.
>>
>> 1) Get started https://link.medium.com/oppqD6dIg5
>>  2) Batch https://
>> link.medium.com/3qumbwRIg5  3) Streaming
>> https://link.medium.com/RBHa2lTIg5  4)
>> Advanced usage https://link.medium.com/CAekyoXIg5
>> 
>>
>>
>> Zahid Rahman  于2020年4月19日周日 下午7:27写道:
>>
>>> Hi Tison,
>>>
>>> I think I may have found what I want in example 22.
>>>
>>> https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration
>>>
>>> I need to create Configuration object first as shown .
>>>
>>> Also I think  flink-conf.yaml file may contain configuration for client
>>> rather than  server. So before starting is irrelevant.
>>> I am going to play around and see but if the Configuration class allows
>>> me to set configuration programmatically and overrides the yaml file then
>>> that would be great.
>>>
>>>
>>>
>>> On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:
>>>
 Thanks.
 flink-conf.yaml does allow me to do what I need to do without making
 any changes to client source code.

 But
 RemoteStreamEnvironment constructor  expects a jar file as the third
 parameter also.

 RemoteStreamEnvironment
 
 (String
 
  host,
 int port, String
 
 ... jarFiles)
 Creates a new RemoteStreamEnvironment that points to the master
 (JobManager) described by the given host name and port.

 On Sun, 19 Apr 2020, 11:02 tison,  wrote:

> You can change flink-conf.yaml "jobmanager.address" or
> "jobmanager.port" options before run the program or take a look at
> RemoteStreamEnvironment which enables configuring host and port.
>
> Best,
> tison.
>
>
> Som Lima  于2020年4月19日周日 下午5:58写道:
>
>> Hi,
>>
>> After running
>>
>> $ ./bin/start-cluster.sh
>>
>> The following line of code defaults jobmanager  to localhost:6123
>>
>> final  ExecutionEnvironment env =
>> Environment.getExecutionEnvironment();
>>
>> which is same on spark.
>>
>> val spark =
>> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>>
>> However if I wish to run the servers on a different physical computer.
>> Then in Spark I can do it this way using the spark URI in my IDE.
>>
>> Conf =
>> SparkConf().setMaster("spark://:").setAppName("anapp")
>>
>> Can you please tell me the equivalent change to make so I can run my
>> servers and my IDE from different physical computers.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


how to send back result via job manager to client

2020-04-19 Thread Eleanore Jin
Hi,
刚刚读到一篇关于Flink 在OLAP 上的使用案例 (
https://ververica.cn/developers/olap-engine-performance-optimization-and-application-cases/),
其中一点提到了:
[image: image.png]
这部分优化,源于 OLAP 的一个特性:OLAP 会将最终计算结果发给客户端,通过JobManager 转发给 Client。

想请问一下这一点是如何实现的: 通过JobManager 把结果转发给Client.

谢谢!
Eleanore


Problem getting watermark right with event time

2020-04-19 Thread Sudan S
Hi,

I am having a problem getting watermark right. The setup is
- I have a Flink Job which reads from a Kafka topic, uses Protobuf
Deserialization, uses Sliding Window of (120seconds, 30 seconds), sums up
the value and finally returns the result.

The code is pasted below.

The problem here is, I'm not able to reach the sink. I am able to reach the
assignTimestamp when the timestamp arrives, but past that, neither process
function nor the sink function is getting invoked in spite of pumping
events regularly. I'm not able to figure out how to debug this issue.
Plz help.

public class StreamingJob {

public static void main(String[] args) throws Exception {

Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers",
"{bootstrap_servers}");
kafkaConsumerProps.setProperty("group.id", "{group_id}");


final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
Configuration());
env.enableCheckpointing(100);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setMaxParallelism(5);
env.setParallelism(5);

SingleOutputStreamOperator texStream = env
.addSource(new FlinkKafkaConsumer011<>("auth", new
EventiSchema(), kafkaConsumerProps)).setParallelism(5).setMaxParallelism(5);
SlidingEventTimeWindows window =
SlidingEventTimeWindows.of(Time.seconds(120), Time.seconds(30));
texStream.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(Eventi.Event element) {
return element.getEventTime().getSeconds() * 1000;
}
}).keyBy(Eventi.Event::getEventTime).window(window).process(new
ProcessWindowFunction() {
@Override
public void process(Timestamp timestamp, Context context,
Iterable elements, Collector out) throws Exception {
int sum = 0;
for (Eventi.Event element : elements) {
sum++;
}
out.collect(sum);
}
}).print()

env.execute();
}
}

-- 
*"The information contained in this e-mail and any accompanying documents 
may contain information that is confidential or otherwise protected from 
disclosure. If you are not the intended recipient of this message, or if 
this message has been addressed to you in error, please immediately alert 
the sender by replying to this e-mail and then delete this message, 
including any attachments. Any dissemination, distribution or other use of 
the contents of this message by anyone other than the intended recipient is 
strictly prohibited. All messages sent to and from this e-mail address may 
be monitored as permitted by applicable law and regulations to ensure 
compliance with our internal policies and to protect our business."*


Re: Job manager URI rpc address:port

2020-04-19 Thread Som Lima
Thanks for the info and links.

I had a lot of problems I am not sure what I was doing wrong.

May be conflicts with setup from apache spark.  I think I may need to setup
users for each development.


Anyway I kept doing fresh installs about four altogether I think.

Everything works fine now
Including remote access  of zeppelin on machines across the local area
network.

Next step  setup remote clusters
 Wish me luck !







On Sun, 19 Apr 2020, 14:58 Jeff Zhang,  wrote:

> Hi Som,
>
> You can take a look at flink on zeppelin, in zeppelin you can connect to a
> remote flink cluster via a few configuration, and you don't need to worry
> about the jars. Flink interpreter will ship necessary jars for you. Here's
> a list of tutorials.
>
> 1) Get started https://link.medium.com/oppqD6dIg5
>  2) Batch https://
> link.medium.com/3qumbwRIg5  3) Streaming
> https://link.medium.com/RBHa2lTIg5  4)
> Advanced usage https://link.medium.com/CAekyoXIg5
> 
>
>
> Zahid Rahman  于2020年4月19日周日 下午7:27写道:
>
>> Hi Tison,
>>
>> I think I may have found what I want in example 22.
>>
>> https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration
>>
>> I need to create Configuration object first as shown .
>>
>> Also I think  flink-conf.yaml file may contain configuration for client
>> rather than  server. So before starting is irrelevant.
>> I am going to play around and see but if the Configuration class allows
>> me to set configuration programmatically and overrides the yaml file then
>> that would be great.
>>
>>
>>
>> On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:
>>
>>> Thanks.
>>> flink-conf.yaml does allow me to do what I need to do without making any
>>> changes to client source code.
>>>
>>> But
>>> RemoteStreamEnvironment constructor  expects a jar file as the third
>>> parameter also.
>>>
>>> RemoteStreamEnvironment
>>> 
>>> (String
>>> 
>>>  host,
>>> int port, String
>>> 
>>> ... jarFiles)
>>> Creates a new RemoteStreamEnvironment that points to the master
>>> (JobManager) described by the given host name and port.
>>>
>>> On Sun, 19 Apr 2020, 11:02 tison,  wrote:
>>>
 You can change flink-conf.yaml "jobmanager.address" or
 "jobmanager.port" options before run the program or take a look at
 RemoteStreamEnvironment which enables configuring host and port.

 Best,
 tison.


 Som Lima  于2020年4月19日周日 下午5:58写道:

> Hi,
>
> After running
>
> $ ./bin/start-cluster.sh
>
> The following line of code defaults jobmanager  to localhost:6123
>
> final  ExecutionEnvironment env =
> Environment.getExecutionEnvironment();
>
> which is same on spark.
>
> val spark =
> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>
> However if I wish to run the servers on a different physical computer.
> Then in Spark I can do it this way using the spark URI in my IDE.
>
> Conf =
> SparkConf().setMaster("spark://:").setAppName("anapp")
>
> Can you please tell me the equivalent change to make so I can run my
> servers and my IDE from different physical computers.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Modelling time for complex events generated out of simple ones

2020-04-19 Thread Yun Tang
Hi Salva

I think this depends on what the relationship between you output and input 
events. If the output ones are just simple wrapper of input ones, e.g. adding 
some simple properties or just read from one place and write to another place, 
I think the output events could hold time which is inherited from input ones. 
That is to say, event-time semantics might be more proper.
On the other hand, if the output events have more independent relationship with 
input ones, and those tasks in Flink TM could be treated as the event 
generator, I think you can make the time as the processing time when generating 
them.
I think there is no absolute rules and all depends on your actual scenarios.

Best
Yun Tang

From: Salva Alcántara 
Sent: Monday, April 20, 2020 2:03
To: user@flink.apache.org 
Subject: Modelling time for complex events generated out of simple ones

My flink application generates output (complex) events based on the
processing of (simple) input events. The generated output events are to be
consumed by other external services. My application works using event-time
semantics, so I am bit in doubt regarding what should I use as the output
events' timestamp.

Should I use:

- the processing time at the moment of generating them?
- the event time (given by the watermark value)?
- both?

For my use case, I am using both for now. But maybe you can come up with
examples/justifications for each of the given options.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Modelling time for complex events generated out of simple ones

2020-04-19 Thread Salva Alcántara
My flink application generates output (complex) events based on the
processing of (simple) input events. The generated output events are to be
consumed by other external services. My application works using event-time
semantics, so I am bit in doubt regarding what should I use as the output
events' timestamp.

Should I use:

- the processing time at the moment of generating them?
- the event time (given by the watermark value)?
- both?

For my use case, I am using both for now. But maybe you can come up with
examples/justifications for each of the given options. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: 关于StreamingFileSink

2020-04-19 Thread Yun Gao
  Hello~ 想再确认一下预期的行为:现在是希望后面重新写之后,用新写过的part-xx来覆盖之前生成的文件么~?


--
From:酷酷的浑蛋 
Send Time:2020 Apr. 18 (Sat.) 20:32
To:user-zh 
Subject:关于StreamingFileSink

 
我在用StreamingFileSink 
往hdfs写数据的时候,如果任务停止了,从前面的某个checkpoint启动(不是最新checkpoint),就会发生下面的情况:


其中part-4-9/part-4-13/part-4-14 
这几个文件已经在最新checkpoint时生成了,任务从前面某个checkpoint启动后,继续生成part-xx文件,但是xx(文件编号)不会从最新开始,这样就导致新生成的.part-4-13.inprogressx/part-4-14.inprogressx最终不会变成完成状态,而且hive读取不到点'.'开头的文件,有什么方式可以避免这样的情况,难道只能手动去改文件名吗



Re: 1.10任务执行过程--源码的一些疑问

2020-04-19 Thread 祝尚
hi,tison,jiacheng感谢解答,按照你说的又仔细看了一遍,确实如此,在实例化MailboxProcessor有把processInput当做参数传进去,在MailboxProcessor#runMailboxLoop中会去执行defaultAction方法
while (processMail(localMailbox)) {
   mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is 
acquired inside default action as needed
}
再次感谢
Best,
Sun.Zhu




> 2020年4月19日 下午8:26,蒋佳成(Jiacheng Jiang) <920334...@qq.com> 写道:
> 
> 在构建MailboxProcessor的时候将streamtask的processInput方法作为MailboxDefaultAction传给了MailboxProcessor。其中的InputStatus
>  status = 
> inputProcessor.processInput();就是处理数据的地方,比如inputProcessor为StreamOneInputProcessor中InputStatus
>  status = 
> input.emitNext(output);input为StreamTaskNetworkInput,里面有processElement方法。StreamTask就是AbstractInvokable,StreamTask的invoke()方法调用了runMailboxLoop(),不就是在StreamTask的invoke()中处理的数据吗?
> 
> 
> 
> --原始邮件--
> 发件人: "祝尚"<17626017...@163.com; 
> 发送时间: 2020年4月19日(星期天) 下午5:37
> 收件人: "user-zh" 主题: 1.10任务执行过程--源码的一些疑问
> 
> 
> 
> Hi,all
> 在阅读1.10有关job执行过程相关源码时遇到一些疑问,我在看到Task#doRun()方法
> invokable.invoke();具体执行过程应该在这个方法里吧?
> 进一步看了StreamTask#invoke()-runMailboxLoop();继续往下深入也没发现最终调用udf的入口
> 问题1:MailboxProcessor、Mailbox、Mail这些概念什么意思,什么作用?
> 
> 然而在另一处实例化AbstractInvokable时,比如StreamTask构造函数里会调用processInput方法,这个就类似1.9之前的实现方式了
> this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, 
> actionExecutor);
> 问题2:这里面是真正的数据处理过程吗?为什么不像1.9之前那样在invokable.invoke()里面做业务处理?
> 感谢您的答复!
> 
> 
>   
>   
>   
> Best,
> Sun.Zhu



?????? ????????-flinksql??kafkasource????

2020-04-19 Thread ??????????????
??
  

 

 ----
  ??:"Benchao Li"

Re: Job manager URI rpc address:port

2020-04-19 Thread Jeff Zhang
Hi Som,

You can take a look at flink on zeppelin, in zeppelin you can connect to a
remote flink cluster via a few configuration, and you don't need to worry
about the jars. Flink interpreter will ship necessary jars for you. Here's
a list of tutorials.

1) Get started https://link.medium.com/oppqD6dIg5
 2) Batch https://link.medium.com/3qumbwRIg5
 3) Streaming https://
link.medium.com/RBHa2lTIg5  4) Advanced
usage https://link.medium.com/CAekyoXIg5 


Zahid Rahman  于2020年4月19日周日 下午7:27写道:

> Hi Tison,
>
> I think I may have found what I want in example 22.
>
> https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration
>
> I need to create Configuration object first as shown .
>
> Also I think  flink-conf.yaml file may contain configuration for client
> rather than  server. So before starting is irrelevant.
> I am going to play around and see but if the Configuration class allows me
> to set configuration programmatically and overrides the yaml file then that
> would be great.
>
>
>
> On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:
>
>> Thanks.
>> flink-conf.yaml does allow me to do what I need to do without making any
>> changes to client source code.
>>
>> But
>> RemoteStreamEnvironment constructor  expects a jar file as the third
>> parameter also.
>>
>> RemoteStreamEnvironment
>> 
>> (String
>> 
>>  host,
>> int port, String
>> 
>> ... jarFiles)
>> Creates a new RemoteStreamEnvironment that points to the master
>> (JobManager) described by the given host name and port.
>>
>> On Sun, 19 Apr 2020, 11:02 tison,  wrote:
>>
>>> You can change flink-conf.yaml "jobmanager.address" or "jobmanager.port"
>>> options before run the program or take a look at RemoteStreamEnvironment
>>> which enables configuring host and port.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Som Lima  于2020年4月19日周日 下午5:58写道:
>>>
 Hi,

 After running

 $ ./bin/start-cluster.sh

 The following line of code defaults jobmanager  to localhost:6123

 final  ExecutionEnvironment env = Environment.getExecutionEnvironment();

 which is same on spark.

 val spark =
 SparkSession.builder.master(local[*]).appname("anapp").getOrCreate

 However if I wish to run the servers on a different physical computer.
 Then in Spark I can do it this way using the spark URI in my IDE.

 Conf =
 SparkConf().setMaster("spark://:").setAppName("anapp")

 Can you please tell me the equivalent change to make so I can run my
 servers and my IDE from different physical computers.














-- 
Best Regards

Jeff Zhang


Re: 问题请教-flinksql的kafkasource方面

2020-04-19 Thread Benchao Li
如果是这种情况,可以让你的source的并发度大于等于kafka partition的数量来避免一下。

Jark Wu  于2020年4月19日周日 下午8:22写道:

> Hi,
>
> 根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
> 根据你的 Java 代码,数据的 event time 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
> 能容忍 5s 乱序).
> 但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 partition
> 进度快很多的现象,
> 导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。
>
> 完美的解决方案还需要等 FLIP-27 的完成。
> 当前可以通过增加 watermark delay来增大迟到数据的容忍。
>
> Best,
> Jark
>
>
> On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  wrote:
>
> > 你好
> >
> >
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
> >
> >
> >
> > 附:
> > userbehavior建表语句
> > CREATE TABLE user_behavior (
> >   user_id BIGINT,
> >   item_id BIGINT,
> >   category_id BIGINT,
> >   behavior STRING,
> >   ts TIMESTAMP(3),
> >   proctime as PROCTIME(),  -- 通过计算列产生一个处理时间列
> >   WATERMARK FOR ts as ts - INTERVAL '5' SECOND --
> > 在ts上定义watermark,ts成为事件时间列
> > ) WITH (
> >   'connector.type' = 'kafka', -- 使用 kafka connector
> >   'connector.version' = 'universal', -- kafka
> > 版本,universal 支持 0.11 以上的版本
> >   'connector.topic' = 'user_behavior', -- kafka topic
> >   'connector.startup-mode' = 'earliest-offset', -- 从起始
> > offset 开始读取
> >   'connector.properties.zookeeper.connect' = '
> > 192.168.0.150:2181', -- zookeeper 地址
> >   'connector.properties.bootstrap.servers' = '
> > 192.168.0.150:9092', -- kafka broker 地址
> >   'format.type' = 'json' -- 数据源格式为 json
> > )
> >
> > 每小时购买数建表语句
> > CREATE TABLE buy_cnt_per_hour (
> >   hour_of_day BIGINT,
> >   buy_cnt BIGINT
> > ) WITH (
> >   'connector.type' = 'elasticsearch', -- 使用 elasticsearch
> > connector
> >   'connector.version' = '6', -- elasticsearch 版本,6 能支持
> > es 6+ 以及 7+ 的版本
> >   'connector.hosts' = 'http://192.168.0.150:9200', --
> > elasticsearch 地址
> >   'connector.index' = 'buy_cnt_per_hour', --
> > elasticsearch 索引名,相当于数据库的表名
> >   'connector.document-type' = 'user_behavior', --
> > elasticsearch 的 type,相当于数据库的库名
> >   'connector.bulk-flush.max-actions' = '1', -- 每条数据都刷新
> >   'format.type' = 'json', -- 输出数据格式 json
> >   'update-mode' = 'append'
> > )
> >
> > 插入语句
> > INSERT INTO buy_cnt_per_hour
> > SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' HOUR)),COUNT(*)
> > FROM user_behavior
> > WHERE behavior = 'buy'
> > GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
> >
> > kafka数据发送代码
> >
> > import com.alibaba.fastjson.JSONObject;
> > import org.apache.kafka.clients.producer.KafkaProducer;
> > import org.apache.kafka.clients.producer.ProducerRecord;
> >
> > import java.text.SimpleDateFormat;
> > import java.util.*;
> >
> >
> > public class UserBehaviorProducer {
> > public static final String brokerList = "192.168.0.150:9092";
> >
> > //public static final String topic="user_behavior";
> > public static final String topic = "user_behavior";
> >
> > public static void main(String args[]) {
> >
> > //配置生产者客户端参数
> > //将配置序列化
> > Properties properties = new Properties();
> > properties.put("key.serializer",
> > "org.apache.kafka.common.serialization.StringSerializer");
> > properties.put("value.serializer",
> > "org.apache.kafka.common.serialization.StringSerializer");
> > properties.put("bootstrap.servers", brokerList);
> > //创建KafkaProducer 实例
> > KafkaProducer > KafkaProducer<(properties);
> > //构建待发送的消息
> > //{"user_id": "952483", "item_id":"310884", "category_id":
> > "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> > //{"user_id": "794777", "item_id":"5119439", "category_id":
> > "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> > String[] behaviors = {"pv", "buy", "coll",
> "cart"};//浏览,购买,收藏,加入购物车
> > JSONObject jsonObject = new JSONObject();
> > HashMap > Random random = new Random();
> > SimpleDateFormat format = new
> > SimpleDateFormat("-MM-dd'T'HH:mm:ss'Z'");
> > long date_long=getDate();
> > while (true) {
> > jsonObject.put("user_id", random.nextInt(90) + 10 +
> > "");
> > jsonObject.put("item_id", random.nextInt(90) + 10 +
> > "");
> > jsonObject.put("category_id", random.nextInt(1000) + "");
> > jsonObject.put("behavior", behaviors[random.nextInt(4)]);
> > jsonObject.put("ts", format.format(new Date(date_long)));
> > String msg = jsonObject.toString();
> > System.out.println(msg);
> > ProducerRecord > ProducerRecord<(topic, msg);
> > producer.send(record);
> > //date_long +=500+random.nextGaussian()*1000;
> > date_long +=800+random.nextGaussian()*1500;
> > try {
> > Thread.sleep(60);
> > } catch (InterruptedException e) {
> > e.printStackTrace();
> > }
> > }
> >
> > }
> >
> > private static long getDate() {
> > Date date = new Date();
> >  

??????1.10????????????--??????????????

2020-04-19 Thread ??????(Jiacheng Jiang)
??MailboxProcessorstreamtask??processInputMailboxDefaultAction??MailboxProcessorInputStatus
 status = 
inputProcessor.processInput();inputProcessor??StreamOneInputProcessor??InputStatus
 status = 
input.emitNext(output);input??StreamTaskNetworkInputprocessElement??StreamTaskAbstractInvokable??StreamTask??invoke()??runMailboxLoop()??StreamTask??invoke()



----
??: ""<17626017...@163.com; 
: 2020??4??19??(??) 5:37
??: "user-zh"

Re: 问题请教-flinksql的kafkasource方面

2020-04-19 Thread Jark Wu
Hi,

根据你描述的现象,以及提供的代码。我觉得原因应该是数据乱序导致的。
根据你的 Java 代码,数据的 event time 不是单调递增的,会有一定程度的乱序,这种乱序在作业正常运行时影响不大(watermark
能容忍 5s 乱序).
但是在追数据时,由于 flink 目前还没有做到event time 对齐,所以会导致追数据时某些 partition 进度比某些 partition
进度快很多的现象,
导致乱序程度拉大(如原先迟到最久的数据时4s,现在可能是10s),所以会导致丢弃的数据更多,也就造成了追数据时,统计值偏低的现象。

完美的解决方案还需要等 FLIP-27 的完成。
当前可以通过增加 watermark delay来增大迟到数据的容忍。

Best,
Jark


On Sat, 18 Apr 2020 at 22:53, 人生若只如初见  wrote:

> 你好
>
> 感谢解答,第一个问题就是当我用插入语句时,如果kafka里面已经发了3个多小时的用户数据,他会马上求出前三个小时的购买数,大概每小时只有140个,但统计的之后几个小时的购买数都在1100左右,两者差了好多
>
>
>
> 附:
> userbehavior建表语句
> CREATE TABLE user_behavior (
>   user_id BIGINT,
>   item_id BIGINT,
>   category_id BIGINT,
>   behavior STRING,
>   ts TIMESTAMP(3),
>   proctime as PROCTIME(),  -- 通过计算列产生一个处理时间列
>   WATERMARK FOR ts as ts - INTERVAL '5' SECOND --
> 在ts上定义watermark,ts成为事件时间列
> ) WITH (
>   'connector.type' = 'kafka', -- 使用 kafka connector
>   'connector.version' = 'universal', -- kafka
> 版本,universal 支持 0.11 以上的版本
>   'connector.topic' = 'user_behavior', -- kafka topic
>   'connector.startup-mode' = 'earliest-offset', -- 从起始
> offset 开始读取
>   'connector.properties.zookeeper.connect' = '
> 192.168.0.150:2181', -- zookeeper 地址
>   'connector.properties.bootstrap.servers' = '
> 192.168.0.150:9092', -- kafka broker 地址
>   'format.type' = 'json' -- 数据源格式为 json
> )
>
> 每小时购买数建表语句
> CREATE TABLE buy_cnt_per_hour (
>   hour_of_day BIGINT,
>   buy_cnt BIGINT
> ) WITH (
>   'connector.type' = 'elasticsearch', -- 使用 elasticsearch
> connector
>   'connector.version' = '6', -- elasticsearch 版本,6 能支持
> es 6+ 以及 7+ 的版本
>   'connector.hosts' = 'http://192.168.0.150:9200', --
> elasticsearch 地址
>   'connector.index' = 'buy_cnt_per_hour', --
> elasticsearch 索引名,相当于数据库的表名
>   'connector.document-type' = 'user_behavior', --
> elasticsearch 的 type,相当于数据库的库名
>   'connector.bulk-flush.max-actions' = '1', -- 每条数据都刷新
>   'format.type' = 'json', -- 输出数据格式 json
>   'update-mode' = 'append'
> )
>
> 插入语句
> INSERT INTO buy_cnt_per_hour
> SELECT HOUR(TUMBLE_START(ts,INTERVAL '1' HOUR)),COUNT(*)
> FROM user_behavior
> WHERE behavior = 'buy'
> GROUP BY TUMBLE(ts,INTERVAL '1' HOUR)
>
> kafka数据发送代码
>
> import com.alibaba.fastjson.JSONObject;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
>
> import java.text.SimpleDateFormat;
> import java.util.*;
>
>
> public class UserBehaviorProducer {
> public static final String brokerList = "192.168.0.150:9092";
>
> //public static final String topic="user_behavior";
> public static final String topic = "user_behavior";
>
> public static void main(String args[]) {
>
> //配置生产者客户端参数
> //将配置序列化
> Properties properties = new Properties();
> properties.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> properties.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> properties.put("bootstrap.servers", brokerList);
> //创建KafkaProducer 实例
> KafkaProducer KafkaProducer<(properties);
> //构建待发送的消息
> //{"user_id": "952483", "item_id":"310884", "category_id":
> "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> //{"user_id": "794777", "item_id":"5119439", "category_id":
> "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
> String[] behaviors = {"pv", "buy", "coll", "cart"};//浏览,购买,收藏,加入购物车
> JSONObject jsonObject = new JSONObject();
> HashMap Random random = new Random();
> SimpleDateFormat format = new
> SimpleDateFormat("-MM-dd'T'HH:mm:ss'Z'");
> long date_long=getDate();
> while (true) {
> jsonObject.put("user_id", random.nextInt(90) + 10 +
> "");
> jsonObject.put("item_id", random.nextInt(90) + 10 +
> "");
> jsonObject.put("category_id", random.nextInt(1000) + "");
> jsonObject.put("behavior", behaviors[random.nextInt(4)]);
> jsonObject.put("ts", format.format(new Date(date_long)));
> String msg = jsonObject.toString();
> System.out.println(msg);
> ProducerRecord ProducerRecord<(topic, msg);
> producer.send(record);
> //date_long +=500+random.nextGaussian()*1000;
> date_long +=800+random.nextGaussian()*1500;
> try {
> Thread.sleep(60);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> }
>
> }
>
> private static long getDate() {
> Date date = new Date();
> Calendar c = Calendar.getInstance();
> c.setTime(date);
> //设置为1号,当前日期既为本月第一天
> c.set(Calendar.DAY_OF_MONTH, 1);
> //将小时至0
> c.set(Calendar.HOUR_OF_DAY, 0);
> //将分钟至0
> c.set(Calendar.MINUTE, 0);
> //将秒至0
> c.set(Calendar.SECOND,0);
> //将毫秒至0
> c.set(Calendar.MILLISECOND, 0);

Re: Job manager URI rpc address:port

2020-04-19 Thread Zahid Rahman
Hi Tison,

I think I may have found what I want in example 22.
https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration

I need to create Configuration object first as shown .

Also I think  flink-conf.yaml file may contain configuration for client
rather than  server. So before starting is irrelevant.
I am going to play around and see but if the Configuration class allows me
to set configuration programmatically and overrides the yaml file then that
would be great.



On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:

> Thanks.
> flink-conf.yaml does allow me to do what I need to do without making any
> changes to client source code.
>
> But
> RemoteStreamEnvironment constructor  expects a jar file as the third
> parameter also.
>
> RemoteStreamEnvironment
> 
> (String
> 
>  host,
> int port, String
> 
> ... jarFiles)
> Creates a new RemoteStreamEnvironment that points to the master
> (JobManager) described by the given host name and port.
>
> On Sun, 19 Apr 2020, 11:02 tison,  wrote:
>
>> You can change flink-conf.yaml "jobmanager.address" or "jobmanager.port"
>> options before run the program or take a look at RemoteStreamEnvironment
>> which enables configuring host and port.
>>
>> Best,
>> tison.
>>
>>
>> Som Lima  于2020年4月19日周日 下午5:58写道:
>>
>>> Hi,
>>>
>>> After running
>>>
>>> $ ./bin/start-cluster.sh
>>>
>>> The following line of code defaults jobmanager  to localhost:6123
>>>
>>> final  ExecutionEnvironment env = Environment.getExecutionEnvironment();
>>>
>>> which is same on spark.
>>>
>>> val spark =
>>> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>>>
>>> However if I wish to run the servers on a different physical computer.
>>> Then in Spark I can do it this way using the spark URI in my IDE.
>>>
>>> Conf =
>>> SparkConf().setMaster("spark://:").setAppName("anapp")
>>>
>>> Can you please tell me the equivalent change to make so I can run my
>>> servers and my IDE from different physical computers.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>


Re: Job manager URI rpc address:port

2020-04-19 Thread Som Lima
Thanks.
flink-conf.yaml does allow me to do what I need to do without making any
changes to client source code.

But
RemoteStreamEnvironment constructor  expects a jar file as the third
parameter also.

RemoteStreamEnvironment

(String

host,
int port, String

... jarFiles)
Creates a new RemoteStreamEnvironment that points to the master
(JobManager) described by the given host name and port.

On Sun, 19 Apr 2020, 11:02 tison,  wrote:

> You can change flink-conf.yaml "jobmanager.address" or "jobmanager.port"
> options before run the program or take a look at RemoteStreamEnvironment
> which enables configuring host and port.
>
> Best,
> tison.
>
>
> Som Lima  于2020年4月19日周日 下午5:58写道:
>
>> Hi,
>>
>> After running
>>
>> $ ./bin/start-cluster.sh
>>
>> The following line of code defaults jobmanager  to localhost:6123
>>
>> final  ExecutionEnvironment env = Environment.getExecutionEnvironment();
>>
>> which is same on spark.
>>
>> val spark =
>> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>>
>> However if I wish to run the servers on a different physical computer.
>> Then in Spark I can do it this way using the spark URI in my IDE.
>>
>> Conf =
>> SparkConf().setMaster("spark://:").setAppName("anapp")
>>
>> Can you please tell me the equivalent change to make so I can run my
>> servers and my IDE from different physical computers.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>


Re: Job manager URI rpc address:port

2020-04-19 Thread tison
You can change flink-conf.yaml "jobmanager.address" or "jobmanager.port"
options before run the program or take a look at RemoteStreamEnvironment
which enables configuring host and port.

Best,
tison.


Som Lima  于2020年4月19日周日 下午5:58写道:

> Hi,
>
> After running
>
> $ ./bin/start-cluster.sh
>
> The following line of code defaults jobmanager  to localhost:6123
>
> final  ExecutionEnvironment env = Environment.getExecutionEnvironment();
>
> which is same on spark.
>
> val spark =
> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>
> However if I wish to run the servers on a different physical computer.
> Then in Spark I can do it this way using the spark URI in my IDE.
>
> Conf =
> SparkConf().setMaster("spark://:").setAppName("anapp")
>
> Can you please tell me the equivalent change to make so I can run my
> servers and my IDE from different physical computers.
>
>
>
>
>
>
>
>
>
>
>
>
>


Job manager URI rpc address:port

2020-04-19 Thread Som Lima
Hi,

After running

$ ./bin/start-cluster.sh

The following line of code defaults jobmanager  to localhost:6123

final  ExecutionEnvironment env = Environment.getExecutionEnvironment();

which is same on spark.

val spark =
SparkSession.builder.master(local[*]).appname("anapp").getOrCreate

However if I wish to run the servers on a different physical computer.
Then in Spark I can do it this way using the spark URI in my IDE.

Conf =  SparkConf().setMaster("spark://:").setAppName("anapp")

Can you please tell me the equivalent change to make so I can run my
servers and my IDE from different physical computers.


Re: 1.10任务执行过程--源码的一些疑问

2020-04-19 Thread tison
invokable 一般是 StreamTask 或者它的子类 StreamSourceTask,具体的 UDF 在 StreamTask
里,有几层包装。

MailBox 那些其实是一个简单的 EventLoop 实现,或者你理解为 Actor Model 的实现也行,可以参考这些名词的解释文章一一对应。

Best,
tison.


祝尚 <17626017...@163.com> 于2020年4月19日周日 下午5:43写道:

> Hi,all
> 在阅读1.10有关job执行过程相关源码时遇到一些疑问,我在看到Task#doRun()方法
> invokable.invoke();具体执行过程应该在这个方法里吧?
> 进一步看了StreamTask#invoke()->runMailboxLoop();继续往下深入也没发现最终调用udf的入口
> 问题1:MailboxProcessor、Mailbox、Mail这些概念什么意思,什么作用?
>
>
> 然而在另一处实例化AbstractInvokable时,比如StreamTask构造函数里会调用processInput方法,这个就类似1.9之前的实现方式了
> this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox,
> actionExecutor);
> 问题2:这里面是真正的数据处理过程吗?为什么不像1.9之前那样在invokable.invoke()里面做业务处理?
> 感谢您的答复!
>
>
>
> Best,
> Sun.Zhu
>
>
>
>


1.10任务执行过程--源码的一些疑问

2020-04-19 Thread 祝尚
Hi,all
在阅读1.10有关job执行过程相关源码时遇到一些疑问,我在看到Task#doRun()方法
invokable.invoke();具体执行过程应该在这个方法里吧?
进一步看了StreamTask#invoke()->runMailboxLoop();继续往下深入也没发现最终调用udf的入口
问题1:MailboxProcessor、Mailbox、Mail这些概念什么意思,什么作用?

然而在另一处实例化AbstractInvokable时,比如StreamTask构造函数里会调用processInput方法,这个就类似1.9之前的实现方式了
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, 
actionExecutor);
问题2:这里面是真正的数据处理过程吗?为什么不像1.9之前那样在invokable.invoke()里面做业务处理?
感谢您的答复!





Best,
Sun.Zhu





Re: How to to in Flink to support below HIVE SQL

2020-04-19 Thread Rui Li
Hey Xiaohua & Jark,

I'm sorry for overlooking the email. Adding to Jark's answers:

DISTRIBUTE BY => the functionality and syntax are not supported. We can
consider this as a candidate feature for 1.12.
named_struct => you should be able to call this function with Hive module
LATERAL VIEW => the syntax is not supported. As Jark mentioned, you can
rewrite the SQL to achieve the same functionalities
row format => defining row formate in DDL will be supported in FLIP-123
delimited fields => defining field delimiter in DDL will be supported in
FLIP-123
STR_TO_MAP => you should be able to call this function with Hive module,
but there's a known issue with this function[1]
Array => you should be able to call this function with Hive module

Feel free to raise questions if anything is still unclear or if you hit any
issues with these features.

[1] https://issues.apache.org/jira/browse/FLINK-16732


On Thu, Apr 9, 2020 at 12:04 PM Jark Wu  wrote:

> Hi Xiaohua,
>
> I'm not very familiar with Hive SQL, I will try to answer some of them:
>
> COALESCE => there is also a COALESCE built-in function in Flink [1]. From
> the documentation, I think they are identical.
> STR_TO_MAP =>  there is also a STR_TO_MAP built-in function in Flink blink
> planner[1]. But the default delimiter is different from Hive's.
> OVERWRITE => Blink planner supports INSERT OVERWRITE [2].
> FULL OUTER JOIN => Blink planner also supports this both streaming mode
> and batch mode.
> Rlike => Blink planner has REGEXP [1] built-in function which I think is
> similar to Hive's Rlike?
> LATERAL VIEW => This is called UDTF in Flink, see how to use UDTF in docs
> [3] "Join with Table Function (UDTF)"
>
> I cc'ed Rui Li who is working on FLIP-123 "DDL and DML compatibility for
> Hive", he may have more insights on this and please correct me if I give a
> wrong answer above.
>
> Best,
> Jark
>
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/functions/systemFunctions.html
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/insert.html#insert-from-select-queries
> [3]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
>
> On Thu, 9 Apr 2020 at 11:23, Xiaohua  wrote:
>
>> Hi,
>>
>> We meet some issue when migrate from Hive/Spark to Flink, Could you please
>> help me?
>>
>> Below is HIVE SQL we used:
>>
>> DISTRIBUTE BY
>> named_struct
>> COALECE
>> LATERAL VIEW
>> row format
>> delimited fields
>> STR_TO_MAP
>> OVERWRITE
>> FULL OUTER JOIN
>> Rlike
>> Array
>>
>> How to do use Flink SQL?
>>
>> Thank you~
>>
>> BR
>> Xiaohua
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

-- 
Cheers,
Rui Li


Re: Flink Serialization as stable (kafka) output format?

2020-04-19 Thread Robert Metzger
Hey Theo,

we recently published a blog post that answers your request for a
comparison between Kryo and Avro in Flink:
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html

On Tue, Mar 10, 2020 at 9:27 AM Arvid Heise  wrote:

> Hi Theo,
>
> I strongly discourage the use of flink serialization for persistent
> storage of data. It was never intended to work in this way and does not
> offer the benefits of Avro of lazy schema evolution and maturity.
>
> Unless you can explicitly measure that Avro is a bottleneck in your setup,
> stick with it. It's the preferred way to store data in Kafka for a reason.
> It's mature, supports plenty of languages, and the schema evolution feature
> will save you so many headaches in the future.
>
> If it turns out to be a bottleneck, the most logical alternative is
> protobuf. Kryo is even worse than Flink serializer for Kafka. In general,
> realistically speaking, it's so much more cost-effective to just add
> another node to your Flink cluster and use Avro than coming up with any
> clever solution (just assume that you need at least one man month to
> implement and do the math).
>
> And btw, you should always use generated Java/scala classes if possible
> for Avro. It's faster and offers a much nicer development experience.
>
> On Mon, Mar 9, 2020 at 3:57 PM Robert Metzger  wrote:
>
>> Hi Theo,
>>
>> However, in most benchmarks, avro turns out to be rather slow in terms of
>>> CPU cycles ( e.g. [1]  )
>>
>>
>> Avro is slower compared to what?
>> You should not only benchmark the CPU cycles for serializing the data. If
>> you are sending JSON strings across the network, you'll probably have a lot
>> more bytes to send across the network, making everything slower (usually
>> network is slower than CPU)
>>
>> One of the reasons why people use Avro it supports schema evolution.
>>
>> Regarding your questions:
>> 1. For this use case, you can use the Flink data format as an internal
>> message format (between the star architecture jobs)
>> 2. Generally speaking no
>> 3. You will at leave have a dependency to flink-core. And this is a
>> somewhat custom setup, so you might be facing breaking API changes.
>> 4. I'm not aware of any benchmarks. The Flink serializers are mostly for
>> internal use (between our operators), Kryo is our fallback (to not suffer
>> to much from the not invented here syndrome), while Avro is meant for
>> cross-system serialization.
>>
>> I have the feeling that you can move ahead with using Flink's Pojo
>> serializer everywhere :)
>>
>> Best,
>> Robert
>>
>>
>>
>>
>> On Wed, Mar 4, 2020 at 1:04 PM Theo Diefenthal <
>> theo.diefent...@scoop-software.de> wrote:
>>
>>> Hi,
>>>
>>> Without knowing too much about flink serialization, I know that Flinks
>>> states that it serializes POJOtypes much faster than even the fast Kryo for
>>> Java. I further know that it supports schema evolution in the same way as
>>> avro.
>>>
>>> In our project, we have a star architecture, where one flink job
>>> produces results into a kafka topic and where we have multiple downstream
>>> consumers from that kafka topic (Mostly other flink jobs).
>>> For fast development cycles, we currently use JSON as output format for
>>> the kafka topic due to easy debugging capabilities and best migration
>>> possibilities. However, when scaling up, we need to switch to a more
>>> efficient format. Most often, Avro is mentioned in combination with a
>>> schema registry, as its much more efficient then JSON where essentially,
>>> each message contains the schema as well. However, in most benchmarks, avro
>>> turns out to be rather slow in terms of CPU cycles ( e.g. [1]
>>>  )
>>>
>>> My question(s) now:
>>> 1. Is it reasonable to use flink serializers as message format in Kafka?
>>> 2. Are there any downsides in using flinks serialization result as
>>> output format to kafka?
>>> 3. Can downstream consumers, written in Java, but not flink components,
>>> also easily deserialize flink serialized POJOs? Or do they have a
>>> dependency to at least full flink-core?
>>> 4. Do you have benchmarks comparing flink (de-)serialization performance
>>> to e.g. kryo and avro?
>>>
>>> The only thing I come up with why I wouldn't use flink serialization is
>>> that we wouldn't have a schema registry, but in our case, we share all our
>>> POJOs in a jar which is used by all components, so that is kind of a schema
>>> registry already and if we only make avro compatible changes, which are
>>> also well treated by flink, that shouldn't be any limitation compared to
>>> like avro+registry?
>>>
>>> Best regards
>>> Theo
>>>
>>> [1] https://github.com/eishay/jvm-serializers/wiki
>>>
>>