Re: Duplicate tasks for the same query

2019-12-31 Thread RKandoji
Thanks Jingsong and Kurt for more details.

Yes, I'm planning to try out DeDuplication when I'm done upgrading to
version 1.9. Hopefully deduplication is done by only one task and reused
everywhere else.

One more follow-up question, I see "For production use cases, we recommend
the old planner that was present before Flink 1.9 for now." warning here
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
This is actually the reason why started with version 1.8, could you please
let me know your opinion about this? and do you think there is any
production code running on version 1.9

Thanks,
Reva




On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:

> BTW, you could also have a more efficient version of deduplicating
> user table by using the topn feature [1].
>
> Best,
> Kurt
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n
>
>
> On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li 
> wrote:
>
>> Hi RKandoji,
>>
>> In theory, you don't need to do something.
>> First, the optimizer will optimize by doing duplicate nodes.
>> Second, after SQL optimization, if the optimized plan still has duplicate
>> nodes, the planner will automatically reuse them.
>> There are config options to control whether we should reuse plan, their
>> default value is true. So you don't need modify them.
>> - table.optimizer.reuse-sub-plan-enabled
>> - table.optimizer.reuse-source-enabled
>>
>> Best,
>> Jingsong Lee
>>
>> On Tue, Dec 31, 2019 at 6:29 AM RKandoji  wrote:
>>
>>> Thanks Terry and Jingsong,
>>>
>>> Currently I'm on 1.8 version using Flink planner for stream proessing,
>>> I'll switch to 1.9 version to try out blink planner.
>>>
>>> Could you please point me to any examples (Java preferred) using
>>> SubplanReuser?
>>>
>>> Thanks,
>>> RK
>>>
>>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li 
>>> wrote:
>>>
 Hi RKandoji,

 FYI: Blink-planner subplan reusing: [1] 1.9 available.

Join  Join
  /  \  /  \
  Filter1  Filter2  Filter1  Filter2
 ||=>   \ /
  Project1 Project2Project1
 ||   |
   Scan1Scan2   Scan1


 [1]
 https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala

 Best,
 Jingsong Lee

 On Mon, Dec 30, 2019 at 12:28 PM Terry Wang  wrote:

> Hi RKandoji~
>
> Could you provide more info about your poc environment?
> Stream or batch? Flink planner or blink planner?
> AFAIK, blink planner has done some optimization to deal such duplicate
> task for one same query. You can have a try with blink planner :
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>
> Best,
> Terry Wang
>
>
>
> 2019年12月30日 03:07,RKandoji  写道:
>
> Hi Team,
>
> I'm doing a POC with flink to understand if it's a good fit for my use
> case.
>
> As part of the process, I need to filter duplicate items and created
> below query to get only the latest records based on timestamp. For
> instance, I have "Users" table which may contain multiple messages for the
> same "userId". So I wrote below query to get only the latest message for a
> given "userId"
>
> Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
> userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
> userId)");
>
> The above query works as expected and contains only the latest users
> based on timestamp.
>
> The issue is when I use "uniqueUsers" table multiple times in a JOIN
> operation, I see multiple tasks in the flink dashboard for the same query
> that is creating "uniqueUsers" table. It is simply creating as many tasks
> as many times I'm using the table.
>
> Below is the JOIN query.
> tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
> Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
>LEFT JOIN uniqueUsersTbl aa ON
> c.userId = aa.userId
>LEFT JOIN uniqueUsersTbl ab
> ON c.ownerId = ab.userId
>LEFT JOIN uniqueUsersTbl ac ON
> c.sellerId = ac.userId
>LEFT JOIN uniqueUsersTbl ad
> ON c.buyerId = ad.userId");
>
> Could someone please help me understand how I can avoid these
> duplicate tasks?
>
>
> Thanks,
> R Kandoji
>
>
>

 --
 Best, Jingsong Lee

>>>
>>
>> --
>> Best, Jingsong Lee
>>
>


Best way set max heap size via env variables or program arguments?

2019-12-31 Thread Li Peng
Hey folks, we've been running a k8 flink application, using the
taskmanager.sh script and passing in the -Djobmanager.heap.size=9000m and
-Dtaskmanager.heap.size=7000m as options to the script. I noticed from the
logs, that the Maximum heap size logged completely ignores these arguments,
and just sets the heap to the default of 922M.

I tested setting the taskmanager.heap.size and jobmanager.heap.size
manually in flink-conf.yaml, and it does work as expected (minus the heap
being set a little lower than configured). But since we want the
application to pick up different memory settings based on the environment
(local/staging/prod/etc), setting it in flink-conf isn't ideal.

So my questions are:

1. Is there actually a way to pass in the heap size via arguments to
taskmanager.sh? Is passing -Dtaskmanager.heap.size supposed to work?
2.  If not, is there a recommended way to set the heap size by environment,
like environmental variables?
3. Also, the maximum heap size logged and -Xms and -Xmx is always a little
smaller than the configured size (i.e. configuring 3000m results 2700m in
the jvm arguments, 1024m results in 922), why is that?

Thanks, and happy new year!
Li


Re: How to get kafka record's timestamp in job

2019-12-31 Thread David Anderson
> In kafka010, ConsumerRecord has a field named timestamp. It is
encapsulated in Kafka010Fetcher.
> How can I get the timestamp when I write a flink job?

Kafka010Fetcher puts the timestamps into the StreamRecords that wrap your
events. If you want to access these timestamps, you can use a
ProcessFunction. The onElement method of a ProcessFunction is passed a
Context object with a timestamp method that returns the timestamp of the
element currently being processed.

David

On Tue, Dec 31, 2019 at 3:37 PM 刘建刚  wrote:

>   In kafka010, ConsumerRecord has a field named timestamp. It is 
> encapsulated
> in Kafka010Fetcher. How can I get the timestamp when I write a flink job?
> Thank you very much.
>


How to get kafka record's timestamp in job

2019-12-31 Thread 刘建刚
  In kafka010, ConsumerRecord has a field named timestamp. It is
encapsulated
in Kafka010Fetcher. How can I get the timestamp when I write a flink job?
Thank you very much.


How to get kafka record's timestamp in job

2019-12-31 Thread 刘建刚
  In kafka010, ConsumerRecord has a field named timestamp. It is
encapsulated
in Kafka010Fetcher. How can I get the timestamp when I write a flink job?
Thank you very much.


?????? flink??????????????

2019-12-31 Thread cs
??
??tm-XX:NewSize
tm15G -XX:NewSize=2442764288
tm20G ?? -XX:NewSize=2442764288
??




----
??:"Xintong Song"

Re: flink内存分配的问题

2019-12-31 Thread Xintong Song
FLINK TM 中是用到了大量的堆外内存的,除了通常意义的 JVM 的栈空间、方法区等堆外开销外,还包括网络 buffer、batch
缓存、RocksDB等。
默认配置是相对保守,为了保证大多数情况下预留出足够的堆外内存。具体是否设置过大了,要看具体运行的作业的情况。可以尝试通过配置'containerized.heap-cutoff-ratio'进行调整。

另外,即将发布的flink 1.10版本中对TM的内存计算进行了优化,不再采用cutoff而是根据用途列出了更具体的配置项,欢迎试用

Thank you~

Xintong Song



On Tue, Dec 31, 2019 at 5:53 PM cs <58683...@qq.com> wrote:

> taskmanager的内存设置为15G但实际的heap只有10G
> 看了tm内存分配的源码1.计算cutoff(15GB * 0.25) 2.计算heap大小(heap计算的入参是15GB - cutoff大小)
> 3.计算offheap大小(offheap的大小等于15GB-heap大小)
> offheap就是最终的-XX:MaxDirectMemorySize的大小
> 想请教下MaxDirectMemorySize的大小有必要设置这么大吗?


Does Flink SQL support stop with savepoint draining?

2019-12-31 Thread Yuval Itzchakov
Hi,

I tried running a stop with savepoint on my Flink job, which includes
numerous Flink SQL streams. While running the command I used `-d` to drain
all streams with MAX_WATERMARK.

Looking at the Flink UI, all sources successfully finished, yet all Flink
SQL streams were in a Running state, and none of them transitioned although
none of them were in a state of backpressure at all. I waited around 10
minutes, yet nothing seemed to change.

My question is, do Flink SQL streams support draining the streams before
stopping?

-- 
Best Regards,
Yuval Itzchakov.


flink??????????????

2019-12-31 Thread cs
taskmanager15Gheap10G
tm??1.cutoff(15GB * 0.25) 
2.heap(heap15GB - cutoff) 
3.offheap(offheap??15GB-heap)
offheap??-XX:MaxDirectMemorySize??
MaxDirectMemorySize??

如何获取算子处理一条数据记录的时间

2019-12-31 Thread 张江
Hi,


我是刚学习flink的菜鸟,现在想对一个算子,获取其处理一条数据记录的时间。请问这个应该怎么做?


谢谢


| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

Re: StreamTableEnvironment.registerDatastream() 开放用户自定义的schemaDescriptionh和DeserializationSchema

2019-12-31 Thread JingsongLee
Hi aven,

这是个合理的需求。
现在的问题是:
- Flink table只支持Row, Pojo, Tuple, CaseClass作为结构化的数据类型。
- 
而你的类型是JSONObject,它其实也是一个结构化的数据类型,但是目前Flink不支持它,所以可以考虑有这样的DeserializationSchema机制来支持它。

但是我理解其实没有差别多少,比如你提供RowDeserializationSchema,其实就是JSONObject到Row的转换,那你完全可以把这个套在DataStream.map中,把它转换成Flink
 table支持的结构化类型。

Best,
Jingsong Lee


--
From:aven.wu 
Send Time:2019年12月31日(星期二) 14:09
To:user-zh@flink.apache.org 
Subject:回复: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

你好!
“把 JSONObject类型定义成object类型” 可以解决在确定字段和类型的情况下并且需要编码到程序中。
如果能开放这部分的能力,可以不通过编码(新增POJO)的方式来完成一个Datastream 到 stream 的table注册。

best wish
发送自 Windows 10 版邮件应用

发件人: Terry Wang
发送时间: 2019年12月30日 12:37
收件人: user-zh@flink.apache.org
主题: Re: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

你这种需求的一种解决思路,可以把 
JSONObject类型定义成object类型,然后注册成table之后通过一个UDTF把JSONObject转换成特定的schema。

Best,
Terry Wang



> 2019年12月27日 19:56,aven.wu  写道:
> 
> StreamTableEnvironment.registerDatastream(),目前只支持pojo 或者 
> 是public属性的对象,根据默认字段或者用户指定的字段注册table,但某些场景下没有固定的格式,比如使用JSONObject类型DataStream就无法通过这种方法注册成table,是否可以提供更底层的API来使table注册的灵活性更高。用户传入schema的描述和自定义的解析器DeserializationSchema.
> 
> 
> 发送自 Windows 10 版邮件应用
> 




回复: -yD Kerberos 认证问题

2019-12-31 Thread sllence
好的非常感谢,现在确实用的只是1.8,只关注了jira的状态,没有关注相关源码的改动

-邮件原件-
发件人: Terry Wang  
发送时间: 2019年12月31日 14:42
收件人: user-zh@flink.apache.org
主题: Re: -yD Kerberos 认证问题

Hi ~
这个问题在最新的代码上已经修复了,在flink 1.9 上应该也是不存在这个问题的,你可以用下看看~
Best,
Terry Wang



> 2019年12月31日 14:18,  
> 写道:
> 
> 大家好
> 
> 我们这里有通过-yd动态的提交Kerberos认证参数的需求,
> 
> 想问下这个jira为啥被标记为了won’t fix,谢谢
> 
> https://issues.apache.org/jira/browse/FLINK-12130
>