回复:flink sql 中值为null时结果都为 false

2020-06-05 Thread whirly
好的,非常感谢。

Best.
Whirly.


| |
whirly
|
|
邮箱:whir...@163.com
|

签名由 网易邮箱大师 定制

在2020年06月06日 11:45,Benchao Li 写道:
哇,非常赞!
我也查了一下,在SQL标准里,bool表达式是有三种value的[1],分别是true、false、unknown。
而且null正常来讲是跟任何value都不相等的,包括另外一个null [2]。

所以如果执行`SELECT null <>
null`,返回结果应该unknown,在flink里,这个应该就是null,而不是true,或者false。
而如果在WHERE条件中出现这种情况的时候,比较的结果应该也是unknown[3],但是默认处理是按照false来处理的。

而`IS [NOT] DISTINCT FROM`就是专门用来处理对null值的比较的场景的。因为它可以处理null,所以它的返回值
只会是true或者false,而不会是unknown。对于你这个场景来说,应该是最合适的。

PS:回复邮件的时候,记得“回复全部”,这样我们的讨论社区里的小伙伴们都可以看到并且受益~

[1] https://modern-sql.com/concept/three-valued-logic
[2] https://modern-sql.com/feature/is-distinct-from
[3]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html#comparison-functions

whirly  于2020年6月6日周六 上午10:42写道:

> Hi.
> 我刚刚找到了解决方法了,flink sql builtin functions 中其实提供了另外的逻辑运算符 IS DISTINCT FROM
> 可以解决这个问题,
> IS DISTINCT FROM 也是不等于,相对于 <> ,Null IS DISTINCT FROM someValue 的结果是 True。
>
> best,
> whirly
>
>
>
>
>
>
> 在 2020-06-06 00:59:12,"Benchao Li"  写道:
>
> Hi,
>
> 我又想了一下这个问题,我认为这个行为的确是有点不太合理,我建了一个issue[1] 来跟踪这个事情。
>
> [1] https://issues.apache.org/jira/browse/FLINK-18164
>
> whirly  于2020年6月5日周五 下午11:20写道:
>
>> 好的,可能只是我的邮件客户端显示的问题,感谢回复。
>>
>> 关于多加一个 is not null的判断的问题,po和我都觉得有些多此一举的感觉,而且有时候条件里字段很多,每个字段之前都需要加一个 is
>> not null,难度也很大,且容易出错。
>>
>> 如果能有一个配置项控制 null <> 'someValue' 结果为true就好了
>>
>>
>>
>>
>> whirly
>> 邮箱:whir...@163.com
>>
>> >
>> 签名由 网易邮箱大师 >
>> 在2020年06月05日 23:08,Benchao Li  写道:
>> Hi,
>>
>> 我这边只收到一封你第一次发的邮件,看起来应该是没有问题。(不知道是不是你邮箱客户端本地显示的问题)
>> 关于你的问题,现在的确是这样子处理的。我想问一下,多加一个IS NOT NULL有什么问题么?
>>
>> whirly  于2020年6月5日周五 下午9:54写道:
>>
>> > 不好意思,刚刚发现163邮箱自动重发了好几次这个提问,不知道怎么回事,可能是邮箱bug? 实在打扰了,而且现在不知道是不是还会自动重发
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > 在 2020-06-05 14:25:10,"whirly"  写道:
>> > >大家好:
>> > >在 flink sql 中,如 SELECT * from order where  product <>
>> 'rubber',如果数据中的
>> > product 字段值是 null, 是无法匹配 product <> 'rubber' 这个条件的,虽然 null 确实不等于
>> 'rubber'
>> > >只有将条件改为  where product is Null or product <> 'rubber' 才能匹配。
>> > >但是我想要 null <> 'rubber' 的结果也为 True,且不想在条件之前加上 product is Null
>> > 的判断,可以怎么办呢?
>> > >
>> > >
>> > >感谢
>> >
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>>
>
> --
>
> Best,
> Benchao Li
>
>

--

Best,
Benchao Li


Re: Re: flink sql 中值为null时结果都为 false

2020-06-05 Thread Benchao Li
哇,非常赞!
我也查了一下,在SQL标准里,bool表达式是有三种value的[1],分别是true、false、unknown。
而且null正常来讲是跟任何value都不相等的,包括另外一个null [2]。

所以如果执行`SELECT null <>
null`,返回结果应该unknown,在flink里,这个应该就是null,而不是true,或者false。
而如果在WHERE条件中出现这种情况的时候,比较的结果应该也是unknown[3],但是默认处理是按照false来处理的。

而`IS [NOT] DISTINCT FROM`就是专门用来处理对null值的比较的场景的。因为它可以处理null,所以它的返回值
只会是true或者false,而不会是unknown。对于你这个场景来说,应该是最合适的。

PS:回复邮件的时候,记得“回复全部”,这样我们的讨论社区里的小伙伴们都可以看到并且受益~

[1] https://modern-sql.com/concept/three-valued-logic
[2] https://modern-sql.com/feature/is-distinct-from
[3]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html#comparison-functions

whirly  于2020年6月6日周六 上午10:42写道:

> Hi.
> 我刚刚找到了解决方法了,flink sql builtin functions 中其实提供了另外的逻辑运算符 IS DISTINCT FROM
> 可以解决这个问题,
> IS DISTINCT FROM 也是不等于,相对于 <> ,Null IS DISTINCT FROM someValue 的结果是 True。
>
> best,
> whirly
>
>
>
>
>
>
> 在 2020-06-06 00:59:12,"Benchao Li"  写道:
>
> Hi,
>
> 我又想了一下这个问题,我认为这个行为的确是有点不太合理,我建了一个issue[1] 来跟踪这个事情。
>
> [1] https://issues.apache.org/jira/browse/FLINK-18164
>
> whirly  于2020年6月5日周五 下午11:20写道:
>
>> 好的,可能只是我的邮件客户端显示的问题,感谢回复。
>>
>> 关于多加一个 is not null的判断的问题,po和我都觉得有些多此一举的感觉,而且有时候条件里字段很多,每个字段之前都需要加一个 is
>> not null,难度也很大,且容易出错。
>>
>> 如果能有一个配置项控制 null <> 'someValue' 结果为true就好了
>>
>>
>>
>>
>> whirly
>> 邮箱:whir...@163.com
>>
>> 
>>
>> 签名由 网易邮箱大师  定制
>>
>> 在2020年06月05日 23:08,Benchao Li  写道:
>> Hi,
>>
>> 我这边只收到一封你第一次发的邮件,看起来应该是没有问题。(不知道是不是你邮箱客户端本地显示的问题)
>> 关于你的问题,现在的确是这样子处理的。我想问一下,多加一个IS NOT NULL有什么问题么?
>>
>> whirly  于2020年6月5日周五 下午9:54写道:
>>
>> > 不好意思,刚刚发现163邮箱自动重发了好几次这个提问,不知道怎么回事,可能是邮箱bug? 实在打扰了,而且现在不知道是不是还会自动重发
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > 在 2020-06-05 14:25:10,"whirly"  写道:
>> > >大家好:
>> > >在 flink sql 中,如 SELECT * from order where  product <>
>> 'rubber',如果数据中的
>> > product 字段值是 null, 是无法匹配 product <> 'rubber' 这个条件的,虽然 null 确实不等于
>> 'rubber'
>> > >只有将条件改为  where product is Null or product <> 'rubber' 才能匹配。
>> > >但是我想要 null <> 'rubber' 的结果也为 True,且不想在条件之前加上 product is Null
>> > 的判断,可以怎么办呢?
>> > >
>> > >
>> > >感谢
>> >
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>>
>
> --
>
> Best,
> Benchao Li
>
>

-- 

Best,
Benchao Li


Re: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

2020-06-05 Thread Congxian Qiu
Hi Chris

>From the given exception, seems there is something wrong of the FileSystem,
one reason is that Arvid gave (incremental checkpoint may generate too much
small files)
You can turn off incremental checkpoint or try to increase the config
`state.backend.fs.memory-threshold` to see if things become better

Best,
Congxian


Arvid Heise  于2020年6月6日周六 上午2:09写道:

> Hi Chris,
>
> could you also try what happens when you turn incremental checkpoints off?
>
> Incremental checkpoints may create many small files which are a bad fit
> for HDFS. You could also evaluate other storage options (net drive, S3) if
> you find incremental checkpoints to be better.
>
> On Tue, Jun 2, 2020 at 2:36 AM Slotterback, Chris <
> chris_slotterb...@comcast.com> wrote:
>
>> Congxian,
>>
>>
>>
>> 1. The checkpoints were failing with this exception scattered through the 
>> logs:
>> 2020-06-01 21:04:37,930 WARN  org.apache.hadoop.hdfs.DataStreamer - 
>> DataStreamer Exception
>> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
>> /flink/flink-checkpoints/ade55daec06ee72aaf7ceade86c6e7a9/chk-1/2093792d-7ebb-4008-8e20-4daf1849c2d4
>>  could only be replicated to 0 nodes instead of minReplication (=1).
>>
>>
>> 2. Yes, we are using incremental checkpointing
>>
>> 3. Currently our windows are configured to use the process function (we
>> were doing aggregates before), which is my understanding that should make
>> our state update/insert ratio lower, as we are building the liststates of
>> each window over time and processing them on trigger.
>>
>> 4. We set the max concurrent checkpoints back to 1, it was originally
>> configured to that and the checkpoints were taking too long to complete
>> before the next checkpoint interval began.
>>
>>
>>
>> Our tm’s are normally 3 slots (3_slots.png), we wanted to try running
>> with 1 slot (1_slot.png) and noticed the checkpoint times fell drastically,
>> but with 1 slot per tm our parallelism had to be dropped and our consumer
>> lag was growing.
>>
>>
>>
>>
>>
>>
>>
>> *From: *Congxian Qiu 
>> *Date: *Friday, May 29, 2020 at 10:59 PM
>> *To: *"Slotterback, Chris" 
>> *Cc: *"user@flink.apache.org" 
>> *Subject: *[EXTERNAL] Re: Inconsistent checkpoint durations vs state size
>>
>>
>>
>> Hi
>>
>> From the given picture,
>>
>> 1. there were some checkpoint failed(but not because of timeout), could
>> you please check why these checkpoint would fail?
>>
>> 2. The checkpoint data size is the delta size for current checkpoint[1],
>> assume you using incremental checkpoint
>>
>> 3. In fig1 the checkpoint size is ~3G, but in fig 2 the delta size can
>> grow to ~ 15G, my gut feeling is that the state update/insert ratio for
>> your program is very high? so that in one checkpoint you'll generate too
>> much sst files
>>
>> 4. from fig 2 seems you configurate
>> execution-checkpointing-max-concurrent-checkpoints[2] bigger than 1, could
>> you please try to set it to 1 and have a try?
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/checkpoint_monitoring.html#history-tab
>> 
>>
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#execution-checkpointing-max-concurrent-checkpoints
>> 
>>
>> Best,
>>
>> Congxian
>>
>>
>>
>>
>>
>> Slotterback, Chris  于2020年5月30日周六 上午7:43
>> 写道:
>>
>> Hi there,
>>
>>
>>
>> We are trying to upgrade a flink app from using FsStateBackend to
>> RocksDBStateBackend to reduce overhead memory requirements. When enabling
>> rocks, we are seeing a drop in used heap memory as it increments to disk,
>> but checkpoint durations have become inconsistent. Our data source has a
>> stable rate of reports coming in parallelly across partitions. The state
>> size doesn’t seem to correlate with the checkpoint duration from what I can
>> see in metrics. we have tried tmpfs and swap on SSDs with high iops, but
>> can’t get a good handle on what’s causing smaller state to take longer to
>> checkpoint. Our checkpoint location is hdfs, and works well in our
>> non-rocks cluster.
>>
>>
>>
>> Is ~100x checkpoint duration expected when going from fs to rocks state
>> backend, and is checkpoint duration supposed to vary this much with a
>> consistent data source normally?
>>
>>
>>
>> Chris
>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH 

Re: 关于flinksql 与维表mysql的关联问题

2020-06-05 Thread Px New
Hi ,我有一个相关操作的一疑问.
疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?

Michael Ran  于2020年6月4日周四 下午5:22写道:

> 放到open 方法里面可以吗?
> 在 2020-06-04 14:15:05,"小屁孩" <932460...@qq.com> 写道:
> >dear:  我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
>


Re: Flink s3 streaming performance

2020-06-05 Thread venkata sateesh` kolluru
Hi Kostas and Arvid,

Thanks for your suggestions.

The small files were already created and I am trying to roll few into a big
file while sinking. But due to the custom bucket assigner, it is hard
getting more files with in the same prefix in specified checkinpointing
time.

For example:
/prefix1/prefix2/YY/MM/DD/HH  is our structure in s3.
checkpointing interval is 5 minutes. prefix1 has 40 different values and
prefix 2 has 1+ values
With in the 5 minute interval, we are able to get part file size in these
prefixes not more than 5-10 files.

Regarding printstream, will figure out how to use SimpleStringEncoder on a
Tuple as I only need to write tuple.f2 element in the file. If you can
guide me on how to do it, I would appreciate it.

Will try Arvid suggestion on increasing fs.s3a.connection.maximum . I was
trying to find about these parameters and could find anywhere. Is there a
place that I could look at these config params list ?

Also I am using s3:// as prefix, would fs.s3a.connection.maximum affect
that too or is there separate param like fs.s3.connection.maximum.

On Fri, Jun 5, 2020 at 2:13 PM Kostas Kloudas  wrote:

> Hi all,
>
> @Venkata, Do you have many small files being created as Arvid suggested?
> If yes, then I tend to agree that S3 is probably not the best sink.
> Although I did not get that from your description.
> In addition, instead of PrintStream you can have a look at the code of the
> SimpleStringEncoder in Flink [1] for a bit more efficient implementation.
>
> Cheers,
> Kostas
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java
>
>
> On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise  wrote:
>
>> Hi Venkata,
>>
>> are the many small files intended or is it rather an issue of our commit
>> on checkpointing? If so then FLINK-11499 [1] should help you. Design is
>> close to done, unfortunately implementation will not make it into 1.11.
>>
>> In any case, I'd look at the parameter fs.s3a.connection.maximum, as you
>> store both state and data on S3. I'd probably go with slot*3 or even higher.
>>
>> Lastly, the way you output elements looks also a bit suspicious.
>> PrintStream is not known for great performance. I'm also surprised that it
>> works without manual flushing.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-11499
>>
>> On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke  wrote:
>>
>>> I think S3 is a wrong storage backend for this volumes of small
>>> messages.
>>> Try to use a NoSQL database or write multiple messages into one file in
>>> S3 (1 or 10)
>>>
>>> If you still want to go with your scenario then try a network optimized
>>> instance and use s3a in Flink and configure s3 entropy.
>>>
>>> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <
>>> vkollur...@gmail.com>:
>>>
>>> 
>>> Hi David,
>>>
>>> The avg size of each file is around 30KB and I have checkpoint interval
>>> of 5 minutes. Some files are even 1 kb, because of checkpoint some files
>>> are merged into 1 big file around 300MB.
>>>
>>> With 120 million files and 4Tb, if the rate of transfer is 300 per
>>> minute, it is taking weeks to write to s3.
>>>
>>> I have tried to increase parallelism of sink but I dont see any
>>> improvement.
>>>
>>> The sink record is Tuple3, the actual content of
>>> file is f2. This is content is written to /f0/f1/part*-*
>>>
>>> I guess the prefix determination in custombucketassigner wont be causing
>>> this delay?
>>>
>>> Could you please shed some light on writing custom s3 sink ?
>>>
>>> Thanks
>>>
>>>
>>> On Sun, May 31, 2020, 6:34 AM David Magalhães 
>>> wrote:
>>>
 Hi Venkata.

 300 requests per minute look like a 200ms per request, which should be
 a normal response time to send a file if there isn't any speed limitation
 (how big are the files?).

 Have you changed the parallelization to be higher than 1? I also
 recommend to limit the source parallelization, because it can consume
 pretty fast from Kafka and create some kind of backpressure.

 I don't any much experience with StreamingFileSink, because I've ended
 up using a custom S3Sink, but I did have some issues writing to S3 because
 the request wasn't parallelised. Check this thread,
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070

 On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
 vkollur...@gmail.com> wrote:

> Hello,
>
> I have posted the same in stackoverflow but didnt get any response. So
> posting it here for help.
>
>
> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787
>
> Details:
>
> I am working on a flink application on kubernetes(eks) which consumes
> data from kafka and write it to s3.
>
> We have around 120 million 

Re: Tumbling windows - increasing checkpoint size over time

2020-06-05 Thread Wissman, Matt
Guowei,

I had a different Flink app that was using 10 or15s intervals – it had a 
similar behavior but not nearly as bad as the 2s interval pipeline. Both have 
much have much longer checkpoint intervals now.

Here is the state config:

state.backend: rocksdb
state.checkpoints.dir: {{ .Values.flink.checkpointUrl }}/checkpoints
state.savepoints.dir: {{ .Values.flink.checkpointUrl }}/savepoints
state.backend.incremental: true
state.backend.rocksdb.localdir: /tmp/taskmanager

Thanks!

-Matt

From: Guowei Ma 
Date: Monday, June 1, 2020 at 1:01 AM
To: "Wissman, Matt" 
Cc: Till Rohrmann , "user@flink.apache.org" 

Subject: Re: Tumbling windows - increasing checkpoint size over time

Hi,
1. I am not the expert of Rocksdb. However, I think the state garbage 
collection depends on the rocksdb compaction especially if the checkpoint 
interval is 2s.  This is because the window element is still in the sst file 
even if the window is triggerred.
2. Do you try the checkpoint interval 15s?  I guess it might reduce the state 
size.
3. Would you like to share your rocksdb configuration? I think this could help 
other state guys to know whether it is related to rocksdb or not.
Best,
Guowei


Wissman, Matt mailto:matt.wiss...@here.com>> 
于2020年5月29日周五 下午10:30写道:
Till,

I’ll have to calculate the theoretical upper bound for our window state. Our 
data distribution and rate has a predictable pattern but the data rate pattern 
didn’t match the checkpoint size growth.

[cid:image001.png@01D63B59.521B7E70]


Here is a screenshot of the checkpoint size for the pipeline. The yellow 
section is when we had the checkpoint interval at 2 secs – the size seems to 
grow linearly and indefinitely. The blue, red and orange lines are in line with 
what I’d expect in terms of checkpoint size (100KB-2 MB).

The incoming stream data for the whole time period is consistent (follows the 
same pattern).

Changing the checkpoint interval seemed to fix the problem of the large and 
growing checkpoint size but I’m not sure why.

Thanks!

-Matt

From: Till Rohrmann mailto:trohrm...@apache.org>>
Date: Thursday, May 28, 2020 at 10:48 AM
To: "Wissman, Matt" mailto:matt.wiss...@here.com>>
Cc: Guowei Ma mailto:guowei@gmail.com>>, 
"user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Tumbling windows - increasing checkpoint size over time

Hi Matt,

when using tumbling windows, then the checkpoint size is not only dependent on 
the number of keys (which is equivalent to the number of open windows) but also 
on how many events arrive for each open window because the windows store every 
window event in its state. Hence, it can be the case that you see different 
checkpoint sizes depending on the actual data distribution which can change 
over time. Have you checked whether the data distribution and rate is constant 
over time?

What is the expected number of keys, size of events and number of events per 
key per second? Based on this information one could try to estimate an upper 
state size bound.

Cheers,
Till

On Wed, May 27, 2020 at 8:19 PM Wissman, Matt 
mailto:matt.wiss...@here.com>> wrote:

Hello Till & Guowei,



Thanks for the replies! Here is a snippet of the window function:



  SingleOutputStreamOperator aggregatedStream = dataStream

.keyBy(idKeySelector())

.window(TumblingProcessingTimeWindows.of(seconds(15)))

.apply(new Aggregator())

.name("Aggregator")

.setParallelism(3);



Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to 100MB 
(we’ve since changed the 5 minutes, which has slowed the checkpoint size growth)

Lateness allowed: 0

Watermarks: nothing is set in terms of watermarks – do they apply for Process 
Time?

The set of keys processed in the stream is stable over time

The checkpoint size actually looks pretty stable now that the interval was 
increased. Is it possible that the short checkpoint interval prevented 
compaction?

Thanks!

-Matt


From: Till Rohrmann mailto:trohrm...@apache.org>>
Date: Wednesday, May 27, 2020 at 9:00 AM
To: Guowei Ma mailto:guowei@gmail.com>>
Cc: "Wissman, Matt" mailto:matt.wiss...@here.com>>, 
"user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Tumbling windows - increasing checkpoint size over time

LEARN FAST: This email originated outside of HERE.
Please do not click on links or open attachments unless you recognize the 
sender and know the content is safe. Thank you.

Hi Matt,

could you give us a bit more information about the windows you are using? They 
are tumbling windows. What's the size of the windows? Do you allow lateness of 
events? What's your checkpoint interval?

Are you using event time? If yes, how is the watermark generated?

You said that the number of events per window is more or less constant. Does 
this is also apply to the size of the individual events?


Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Arvid Heise
Instead of changing the query, I used to embed the query in a larger
context for similar works.

So if you get an arbitrary query X which produces exactly one result (e.g.
X = select sum(revenue) from lineorder group by 1) then you can craft a
query where you add a dummy pk to the result.

Table original = env.sqlQuery(X);
Table withDummy = original.select("'dummy' as pk, *');

On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar 
wrote:

> Hey Arvid,
>
> Thanks for the reply.
>
> As you suggested, rewriting the query to add a dummy output and group by
> the clause - "select 1, sum(revenue) from lineorder group by 1" does add
> a unique key column to the output, and the pipeline succeeds.
>
> However, the application may get arbitrary SQL from the upstream server.
> This makes the solution tricky - I'd have to change the query to add dummy
> grouping key for all grouping nodes in the query and projection node to
> drop the dummy key. I can try to account for this upstream (in query
> generation layer) but it would prefer to have it solved within the
> execution engine itself.
>
> Regards,
> Satyam
>
> On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise  wrote:
>
>> Hi Satyam,
>>
>> you are right, there seems to be a disconnect between javadoc and
>> implementation. Jark probably knows more.
>>
>> In your case, couldn't you just add a dummy column containing a constant
>> key?
>>
>> select 'revenue' AS name, sum(revenue) from lineorder
>>
>> and then set the dummy field as PK?
>>
>> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar 
>> wrote:
>>
>>> Hello,
>>>
>>> I am using Flink as the query engine to build an alerting/monitoring
>>> application. One of the use cases in our product requires continuously
>>> tracking and charting the output of an aggregate only SQL query,
>>> for example, select sum(revenue) from lineorder. A desirable property
>>> from the output of Flink job for such a query is that there is always
>>> exactly 1 row in the result set (or that the number of rows does not fall
>>> to 0 due to retractions for previous output).  In other words, I need
>>> upsert "like" semantics for the output of the query.
>>>
>>> I was hopeful after reading comments in UpsertStreamTableSink.java that
>>> this condition is accounted for in the implementation, however, a pipeline
>>> with above query writing to a concrete UpsertStreamTableSink fails with the
>>> following error  - "UpsertStreamTableSink requires that Table has" + "
>>> a full primary keys if it is updated." Here are the relevant comments
>>> from UpsertStreamTableSink.java for reference -
>>>
>>> ```
>>> Configures the unique key fields of the {@link Table} to write. The
>>> method is called after {@link TableSink#configure(String[],
>>> TypeInformation[])}.
>>>
>>> The keys array might be empty, if the table consists of a single
>>> (updated) record. If the table does not have a key and is append-only, the
>>> keys attribute is null.
>>>
>>> @param keys the field names of the table's keys, an empty array if the
>>> table has a single row, and null if the table is append-only and has no key.
>>> void setKeyFields(String[] keys);
>>> ```
>>>
>>> The code in StreamExec(Legacy)Sink.scala appears to conform to observed
>>> failure and does not match the comment about "empty key array if the table
>>> consists of a single record".
>>>
>>>  With that context, I have the following questions -
>>>
>>> 1. Is the UpsertStreamTableSink expected to consume the output of such
>>> aggregate only queries? Or is my interpretation of the code and comment
>>> wrong and I have misconfigured UpsertStreamTableSink?
>>> 2. If the answer to (1) is no, are there any recommended patterns for
>>> solving this use-case such that the client never observes an empty result
>>> set for the output of this query?
>>>
>>> Regards,
>>> Satyam
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Satyam Shekhar
Hey Arvid,

Thanks for the reply.

As you suggested, rewriting the query to add a dummy output and group by
the clause - "select 1, sum(revenue) from lineorder group by 1" does add a
unique key column to the output, and the pipeline succeeds.

However, the application may get arbitrary SQL from the upstream server.
This makes the solution tricky - I'd have to change the query to add dummy
grouping key for all grouping nodes in the query and projection node to
drop the dummy key. I can try to account for this upstream (in query
generation layer) but it would prefer to have it solved within the
execution engine itself.

Regards,
Satyam

On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise  wrote:

> Hi Satyam,
>
> you are right, there seems to be a disconnect between javadoc and
> implementation. Jark probably knows more.
>
> In your case, couldn't you just add a dummy column containing a constant
> key?
>
> select 'revenue' AS name, sum(revenue) from lineorder
>
> and then set the dummy field as PK?
>
> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar 
> wrote:
>
>> Hello,
>>
>> I am using Flink as the query engine to build an alerting/monitoring
>> application. One of the use cases in our product requires continuously
>> tracking and charting the output of an aggregate only SQL query,
>> for example, select sum(revenue) from lineorder. A desirable property
>> from the output of Flink job for such a query is that there is always
>> exactly 1 row in the result set (or that the number of rows does not fall
>> to 0 due to retractions for previous output).  In other words, I need
>> upsert "like" semantics for the output of the query.
>>
>> I was hopeful after reading comments in UpsertStreamTableSink.java that
>> this condition is accounted for in the implementation, however, a pipeline
>> with above query writing to a concrete UpsertStreamTableSink fails with the
>> following error  - "UpsertStreamTableSink requires that Table has" + " a
>> full primary keys if it is updated." Here are the relevant comments from
>> UpsertStreamTableSink.java for reference -
>>
>> ```
>> Configures the unique key fields of the {@link Table} to write. The
>> method is called after {@link TableSink#configure(String[],
>> TypeInformation[])}.
>>
>> The keys array might be empty, if the table consists of a single
>> (updated) record. If the table does not have a key and is append-only, the
>> keys attribute is null.
>>
>> @param keys the field names of the table's keys, an empty array if the
>> table has a single row, and null if the table is append-only and has no key.
>> void setKeyFields(String[] keys);
>> ```
>>
>> The code in StreamExec(Legacy)Sink.scala appears to conform to observed
>> failure and does not match the comment about "empty key array if the table
>> consists of a single record".
>>
>>  With that context, I have the following questions -
>>
>> 1. Is the UpsertStreamTableSink expected to consume the output of such
>> aggregate only queries? Or is my interpretation of the code and comment
>> wrong and I have misconfigured UpsertStreamTableSink?
>> 2. If the answer to (1) is no, are there any recommended patterns for
>> solving this use-case such that the client never observes an empty result
>> set for the output of this query?
>>
>> Regards,
>> Satyam
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Arvid Heise
Hi Satyam,

you are right, there seems to be a disconnect between javadoc and
implementation. Jark probably knows more.

In your case, couldn't you just add a dummy column containing a constant
key?

select 'revenue' AS name, sum(revenue) from lineorder

and then set the dummy field as PK?

On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar 
wrote:

> Hello,
>
> I am using Flink as the query engine to build an alerting/monitoring
> application. One of the use cases in our product requires continuously
> tracking and charting the output of an aggregate only SQL query,
> for example, select sum(revenue) from lineorder. A desirable property
> from the output of Flink job for such a query is that there is always
> exactly 1 row in the result set (or that the number of rows does not fall
> to 0 due to retractions for previous output).  In other words, I need
> upsert "like" semantics for the output of the query.
>
> I was hopeful after reading comments in UpsertStreamTableSink.java that
> this condition is accounted for in the implementation, however, a pipeline
> with above query writing to a concrete UpsertStreamTableSink fails with the
> following error  - "UpsertStreamTableSink requires that Table has" + " a
> full primary keys if it is updated." Here are the relevant comments from
> UpsertStreamTableSink.java for reference -
>
> ```
> Configures the unique key fields of the {@link Table} to write. The method
> is called after {@link TableSink#configure(String[], TypeInformation[])}.
>
> The keys array might be empty, if the table consists of a single
> (updated) record. If the table does not have a key and is append-only, the
> keys attribute is null.
>
> @param keys the field names of the table's keys, an empty array if the
> table has a single row, and null if the table is append-only and has no key.
> void setKeyFields(String[] keys);
> ```
>
> The code in StreamExec(Legacy)Sink.scala appears to conform to observed
> failure and does not match the comment about "empty key array if the table
> consists of a single record".
>
>  With that context, I have the following questions -
>
> 1. Is the UpsertStreamTableSink expected to consume the output of such
> aggregate only queries? Or is my interpretation of the code and comment
> wrong and I have misconfigured UpsertStreamTableSink?
> 2. If the answer to (1) is no, are there any recommended patterns for
> solving this use-case such that the client never observes an empty result
> set for the output of this query?
>
> Regards,
> Satyam
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Stopping a job

2020-06-05 Thread Arvid Heise
Hi,

could you check if this SO thread [1] helps you already?

[1]
https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable

On Thu, Jun 4, 2020 at 7:43 PM M Singh  wrote:

> Hi:
>
> I am running a job which consumes data from Kinesis and send data to
> another Kinesis queue.  I am using an older version of Flink (1.6), and
> when I try to stop the job I get an exception
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Job termination
> (STOP) failed: This job is not stoppable.]
>
>
> I wanted to find out what is a stoppable job and it possible to make a job
> stoppable if is reading/writing to kinesis ?
>
> Thanks
>
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Getting Window information from coGroup functin

2020-06-05 Thread Arvid Heise
Hi Sudan,

it seems to be unsupported directly.

You can have a hacky workaround by replicating apply[1] in your code and
adjust the last line to call your CoGroupWindowFunction.

[1]
https://github.com/apache/flink/blob/aedb4068408cfcad6f258526b00fcbff7f40fb82/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L325-L359

On Thu, Jun 4, 2020 at 10:09 AM Dawid Wysakowicz 
wrote:

> I am afraid there is no way to do that. At least I could not think of a
> way to do it.
>
> Maybe @aljoscha cc'ed could help here.
> On 29/05/2020 13:25, Sudan S wrote:
>
> Hi,
>
> I have a usecase where i want to join two streams. I am using coGroup for
> this
>
> KeyBuilder leftKey = new 
> KeyBuilder(jobConfiguration.getConnectStream().getLeftKey());
> KeyBuilder rightKey = new 
> KeyBuilder(jobConfiguration.getConnectStream().getRightKey());
> leftSource.coGroup(rightSource).where(leftKey).equalTo(rightKey)
>   .window(...)
>   .apply()
>   .addSink(*...*);
>
>  For apply method i'm using RichCoGroupFunction. I am not able to find access 
> to Window object similar to
>
> ProcessWindowFunction. I would be interested in extracting start time, end 
> time and key of the window
>
> Plz suggest if there are any alternatives
>
>
> --
> *"The information contained in this e-mail and any accompanying documents
> may contain information that is confidential or otherwise protected from
> disclosure. If you are not the intended recipient of this message, or if
> this message has been addressed to you in error, please immediately alert
> the sender by replying to this e-mail and then delete this message,
> including any attachments. Any dissemination, distribution or other use of
> the contents of this message by anyone other than the intended recipient is
> strictly prohibited. All messages sent to and from this e-mail address may
> be monitored as permitted by applicable law and regulations to ensure
> compliance with our internal policies and to protect our business."*
> --
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: User / Job Manager (permissions) for Flink

2020-06-05 Thread Arvid Heise
If you are running in K8s, you could also directly use the ingest layer of
that. That's especially convenient if you have managed to connect that your
company's SSO.

On Tue, Jun 2, 2020 at 9:38 PM Robert Metzger  wrote:

> Hi David,
>
> I guess you could also "just" put a nginx in front of Flink's REST API
> that controls access.
>
> On Tue, Jun 2, 2020 at 7:05 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi David,
>>
>> One option is Ververica Platform which has a notion of Namespaces:
>> https://docs.ververica.com/administration/namespaces.html
>>
>> I guess Konstantin can tell you more about it.
>>
>> Disclaimer: I work for a company that develops this product.
>>
>> Regards,
>> Roman
>>
>>
>> On Tue, Jun 2, 2020 at 5:37 PM David Magalhães 
>> wrote:
>>
>>> Hi, not sure if this was discussed (for a brief search I couldn't find
>>> anything), but I would like to know if there is an application that uses
>>> Flink REST API to provide some kind of user management, like allow a
>>> certain user to login and manage some jobs running in the link, limit the
>>> parallelization for each job, etc.
>>>
>>> The idea was for certain users can only access their job, and can
>>> stop/start their jobs, but can't interfere with other jobs that run on the
>>> same cluster.
>>>
>>> Thanks
>>>
>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Creating Kafka Topic dynamically in Flink

2020-06-05 Thread Arvid Heise
Hi Prasanna,

auto.create.topics.enable is only recommended for development clusters and
not in production use cases (as one programming error could potentially
flood the whole broker with a large amount of topics). I have experienced
first hand the mess it makes.
I'd suggest finding a supplemental external solution to that. You need to
configure retention policies and ACLs anyways on the topics on all real
environments.

In any case, I'd also discourage splitting data that is in one Kafka topic
at all. I'd rather split it into separate partitions of the same topic and
then only consume the respective partition. But it's usually so much
cheaper to just filter irrelevant events on the original topic than for
example later correlate a subset of events in the split topics. Only in the
original topic, you will easily have a clear ordering of events happening
to the same entity (key).

On Tue, Jun 2, 2020 at 10:37 AM Jark Wu  wrote:

> I think "auto.create.topics.enable" is enabled by default [1]?
>
> Best,
> Jark
>
> [1]: https://kafka.apache.org/documentation/#auto.create.topics.enable
>
> On Mon, 1 Jun 2020 at 19:55, Leonard Xu  wrote:
>
>> I think @brat is right, I didn’t know the Kafka property
>>  'auto.create.topics.enable’ , you can pass the property to Kafka Producer,
>> that should work.
>> Best,
>> Leonard Xu
>>
>> 在 2020年6月1日,18:33,satya brat  写道:
>>
>> Prasanna,
>> You might want to check the kafka broker configs where
>> 'auto.create.topics.enable' helps with creating a new topic whenever a new
>> message with non existent topic is published.
>> https://kafka.apache.org/documentation/#brokerconfigs
>>
>> I am not too sure about pitfalls if any.
>>
>> On Mon, Jun 1, 2020 at 3:20 PM Leonard Xu  wrote:
>>
>>> Hi, kumar
>>>
>>> Sorry for missed the original question, I think we can not create topic
>>> dynamically current, creating topic should belong to control flow rather a
>>> data flow, and user may has some custom configurations of the topic from my
>>> understanding. Maybe you need implement the logic of check/create/manage
>>> topic in your custom SinkFunction so that the topic can create dynamically
>>> in runtime.
>>>
>>> Best,
>>> Leonard Xu
>>>
>>> 在 2020年6月1日,17:02,Prasanna kumar  写道:
>>>
>>> Leaonard,
>>>
>>> Thanks for the reply and would look into those options.
>>> But as for the original question, could we create a topic dynamically
>>> when required .
>>>
>>> Prasanna.
>>>
>>> On Mon, Jun 1, 2020 at 2:18 PM Leonard Xu  wrote:
>>>
 Hi, kumar

 Flink support consume/produce from/to multiple kafka topics[1], in your
 case you can implement KeyedSerializationSchema(legacy interface) or
 KafkaSerializationSchema[2] to make one producer instance support send data
 to multiple topics. There is an ITCase you can reference[3].


 Best,
 Leonard Xu

 [1]
 https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-producer
 [2]
 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java

 [3]
 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java#L126


 在 2020年6月1日,15:35,Prasanna kumar  写道:

 Hi,

 I have Use Case where i read events from a Single kafka Stream
 comprising of JSON messages.

 Requirement is to split the stream into multiple output streams based
 on some criteria say based on Type of Event or Based on Type and Customer
 associated with the event.

 We could achieve the splitting of stream using Side outputs as i have
 seen in the documentation.

 Our business environment is such that there could be new event types
 flowing in and would the Flink Kafka producer create the topics dynamically
 based on the inflowing events. I did not see any documentation saying
 that it could create.

 Or should it be always pre created by running a script separately. (Not
 a good scalable practice in our case)

 Thanks,
 Prasanna.



>>>
>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Best way to "emulate" a rich Partitioner with open() and close() methods ?

2020-06-05 Thread Arvid Heise
Hi Arnaud,

just to add up. The overhead of this additional map is negligible if you
enable object reuse [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html

On Tue, Jun 2, 2020 at 10:34 AM Robert Metzger  wrote:

> I'm not 100% sure about this answer, that's why I'm CCing Aljoscha to
> correct me if needed:
>
> Partitioners are not regular operators (like a map or window), thus they
> are not included in the regular Task lifecycle methods (of open() / map()
> etc. / close(), with the proper error handling, task cancellation
> mechanisms etc.). The custom partition function is called somewhere close
> to the network stack.
> It would be quite a lot of effort (and added complexity to the codebase)
> to allow for rich partitioners. Given that custom partitioners are a rarely
> used feature, it would not be justified to spend a lot of time for this
> (there's also a good workaround available)
>
>
> On Fri, May 29, 2020 at 2:46 PM LINZ, Arnaud 
> wrote:
>
>> Hello,
>>
>>
>>
>> Yes, that would definitely do the trick, with an extra mapper after keyBy
>> to remove the tuple so that it stays seamless. It’s less hacky that what I
>> was thinking of, thanks!
>>
>> However, is there any plan in a future release to have rich partitioners
>> ? That would avoid adding  overhead and “intermediate” technical info in
>> the stream payload.
>>
>> Best,
>>
>> Arnaud
>>
>>
>>
>> *De :* Robert Metzger 
>> *Envoyé :* vendredi 29 mai 2020 13:10
>> *À :* LINZ, Arnaud 
>> *Cc :* user 
>> *Objet :* Re: Best way to "emulate" a rich Partitioner with open() and
>> close() methods ?
>>
>>
>>
>> Hi Arnaud,
>>
>>
>>
>> Maybe I don't fully understand the constraints, but what about
>>
>> stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());
>>
>>
>> The map(new GetKuduPartitionMapper) will be a regular RichMapFunction
>> with open() and close() where you can handle the connection with Kudu's
>> partitioning service.
>>
>> The map will output a Tuple2 (or something nicer :) ),
>> then Flink shuffles your data correctly, and the sinks will process the
>> data correctly partitioned.
>>
>>
>>
>> I hope that this is what you were looking for!
>>
>>
>>
>> Best,
>>
>> Robert
>>
>>
>>
>> On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud 
>> wrote:
>>
>> Hello,
>>
>>
>>
>> I would like to upgrade the performance of my Apache Kudu Sink by using
>> the new “KuduPartitioner” of Kudu API to match Flink stream partitions with
>> Kudu partitions to lower the network shuffling.
>>
>> For that, I would like to implement something like
>>
>> stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new
>> KuduSink(…)));
>>
>> With KuduFLinkPartitioner a implementation of 
>> org.apache.flink.api.common.functions.Partitioner
>> that internally make use of the KuduPartitioner client tool of Kudu’s API.
>>
>>
>>
>> However for that KuduPartioner to work, it needs to open – and close at
>> the end – a connection to the Kudu table – obviously something that can’t
>> be done for each line. But there is no “AbstractRichPartitioner” with
>> open() and close() method that I can use for that (the way I use it in the
>> sink for instance).
>>
>>
>>
>> What is the best way to implement this ?
>>
>> I thought of ThreadLocals that would be initialized during the first call
>> to *int* partition(K key, *int* numPartitions);  but I won’t be able to
>> close() things nicely as I won’t be notified on job termination.
>>
>>
>>
>> I thought of putting those static ThreadLocals inside a “Identity Mapper”
>> that would be called just prior the partition with something like :
>>
>> stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new
>> KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));
>>
>> with kudu connections initialized in the mapper open(), closed in the
>> mapper close(), and used  in the partitioner partition().
>>
>> However It looks like an ugly hack breaking every coding principle, but
>> as long as the threads are reused between the mapper and the partitioner I
>> think that it should work.
>>
>>
>>
>> Is there a better way to do this ?
>>
>>
>>
>> Best regards,
>>
>> Arnaud
>>
>>
>>
>>
>>
>>
>> --
>>
>>
>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>> vous n'êtes pas destinataire de ce message, merci de le détruire et
>> d'avertir l'expéditeur.
>>
>> The integrity of this message cannot be guaranteed on the Internet. The
>> company that sent this message cannot therefore be held liable for its
>> content nor attachments. Any unauthorized use or dissemination is
>> prohibited. If you are not the intended recipient of this message, then
>> please delete it and notify the sender.
>>
>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Re: Dynamically merge multiple upstream souces

2020-06-05 Thread Arvid Heise
Hi Yi,

one option is to use Avro, where you define one global Avro schema as the
source of truth. Then you add aliases [1] to this schema for each source
where the fields are named differently. You use the same schema to read the
Avro messages from Kafka and Avro automatically converts the data with its
write schema (stored in some schema registry) to your global schema.

If you pay close attention to the requirements of schema evolution of Avro
[2], you could easily add and remove some fields in the different sources
without changing anything programmatically in your Flink ingestion job.

[1] https://avro.apache.org/docs/1.8.1/spec.html#Aliases
[2] https://avro.apache.org/docs/1.8.1/spec.html#Schema+Resolution

On Tue, Jun 2, 2020 at 8:35 AM  wrote:

> Hi all,
>
> I have a user case where I want to merge several upstream data source
> (Kafka topics). The data are essential the same,
> but they have different field names.
>
> I guess I can say my problem is not so much about flink itself. It is
> about how to deserialize data and merge different data effectively with
> flink.
> I can define different schemas and then deserialize data and merge them
> manually. I wonder if there is any dynamical way to do such thing, that is,
> I want to changing field names works like changing pandas dataframe column
> names. I see there is already
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-120%3A+Support+conversion+between+PyFlink+Table+and+Pandas+DataFrame
> but resorting to pandas implies I need to work with python, which is
> something I prefer not to do.
>
> What is your practice on dynamically changing sources and merging them?
> I'd love to here your opinion.
>
> Bests,
> Yi
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Flink s3 streaming performance

2020-06-05 Thread Kostas Kloudas
Hi all,

@Venkata, Do you have many small files being created as Arvid suggested? If
yes, then I tend to agree that S3 is probably not the best sink. Although I
did not get that from your description.
In addition, instead of PrintStream you can have a look at the code of the
SimpleStringEncoder in Flink [1] for a bit more efficient implementation.

Cheers,
Kostas

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java


On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise  wrote:

> Hi Venkata,
>
> are the many small files intended or is it rather an issue of our commit
> on checkpointing? If so then FLINK-11499 [1] should help you. Design is
> close to done, unfortunately implementation will not make it into 1.11.
>
> In any case, I'd look at the parameter fs.s3a.connection.maximum, as you
> store both state and data on S3. I'd probably go with slot*3 or even higher.
>
> Lastly, the way you output elements looks also a bit suspicious.
> PrintStream is not known for great performance. I'm also surprised that it
> works without manual flushing.
>
> [1] https://issues.apache.org/jira/browse/FLINK-11499
>
> On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke  wrote:
>
>> I think S3 is a wrong storage backend for this volumes of small messages.
>> Try to use a NoSQL database or write multiple messages into one file in
>> S3 (1 or 10)
>>
>> If you still want to go with your scenario then try a network optimized
>> instance and use s3a in Flink and configure s3 entropy.
>>
>> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <
>> vkollur...@gmail.com>:
>>
>> 
>> Hi David,
>>
>> The avg size of each file is around 30KB and I have checkpoint interval
>> of 5 minutes. Some files are even 1 kb, because of checkpoint some files
>> are merged into 1 big file around 300MB.
>>
>> With 120 million files and 4Tb, if the rate of transfer is 300 per
>> minute, it is taking weeks to write to s3.
>>
>> I have tried to increase parallelism of sink but I dont see any
>> improvement.
>>
>> The sink record is Tuple3, the actual content of
>> file is f2. This is content is written to /f0/f1/part*-*
>>
>> I guess the prefix determination in custombucketassigner wont be causing
>> this delay?
>>
>> Could you please shed some light on writing custom s3 sink ?
>>
>> Thanks
>>
>>
>> On Sun, May 31, 2020, 6:34 AM David Magalhães 
>> wrote:
>>
>>> Hi Venkata.
>>>
>>> 300 requests per minute look like a 200ms per request, which should be a
>>> normal response time to send a file if there isn't any speed limitation
>>> (how big are the files?).
>>>
>>> Have you changed the parallelization to be higher than 1? I also
>>> recommend to limit the source parallelization, because it can consume
>>> pretty fast from Kafka and create some kind of backpressure.
>>>
>>> I don't any much experience with StreamingFileSink, because I've ended
>>> up using a custom S3Sink, but I did have some issues writing to S3 because
>>> the request wasn't parallelised. Check this thread,
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070
>>>
>>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
>>> vkollur...@gmail.com> wrote:
>>>
 Hello,

 I have posted the same in stackoverflow but didnt get any response. So
 posting it here for help.


 https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787

 Details:

 I am working on a flink application on kubernetes(eks) which consumes
 data from kafka and write it to s3.

 We have around 120 million xml messages of size 4TB in kafka. Consuming
 from kafka is super fast.

 These are just string messages from kafka.

 There is a high back pressure while writing to s3. We are not even
 hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am
 seeing only 300 writes per minute to S3 which is very slow.

 I am using StreamFileSink to write to s3 with Rolling policy as
 OnCheckpointPolicy.

 Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or
 s3p)

 Other than this I dont have any config related to s3

 StreamingFileSink> sink = 
 StreamingFileSink
 .forRowFormat(new Path(s3://BUCKET),
 (Tuple3 element, OutputStream 
 stream) -> {
 PrintStream out = new PrintStream(stream);
 out.println(element.f2);
 })
 // Determine component type for each record
 .withBucketAssigner(new CustomBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
 .build();

 Is there 

Re: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

2020-06-05 Thread Arvid Heise
Hi Chris,

could you also try what happens when you turn incremental checkpoints off?

Incremental checkpoints may create many small files which are a bad fit for
HDFS. You could also evaluate other storage options (net drive, S3) if you
find incremental checkpoints to be better.

On Tue, Jun 2, 2020 at 2:36 AM Slotterback, Chris <
chris_slotterb...@comcast.com> wrote:

> Congxian,
>
>
>
> 1. The checkpoints were failing with this exception scattered through the 
> logs:
> 2020-06-01 21:04:37,930 WARN  org.apache.hadoop.hdfs.DataStreamer - 
> DataStreamer Exception
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
> /flink/flink-checkpoints/ade55daec06ee72aaf7ceade86c6e7a9/chk-1/2093792d-7ebb-4008-8e20-4daf1849c2d4
>  could only be replicated to 0 nodes instead of minReplication (=1).
>
>
> 2. Yes, we are using incremental checkpointing
>
> 3. Currently our windows are configured to use the process function (we
> were doing aggregates before), which is my understanding that should make
> our state update/insert ratio lower, as we are building the liststates of
> each window over time and processing them on trigger.
>
> 4. We set the max concurrent checkpoints back to 1, it was originally
> configured to that and the checkpoints were taking too long to complete
> before the next checkpoint interval began.
>
>
>
> Our tm’s are normally 3 slots (3_slots.png), we wanted to try running with
> 1 slot (1_slot.png) and noticed the checkpoint times fell drastically, but
> with 1 slot per tm our parallelism had to be dropped and our consumer lag
> was growing.
>
>
>
>
>
>
>
> *From: *Congxian Qiu 
> *Date: *Friday, May 29, 2020 at 10:59 PM
> *To: *"Slotterback, Chris" 
> *Cc: *"user@flink.apache.org" 
> *Subject: *[EXTERNAL] Re: Inconsistent checkpoint durations vs state size
>
>
>
> Hi
>
> From the given picture,
>
> 1. there were some checkpoint failed(but not because of timeout), could
> you please check why these checkpoint would fail?
>
> 2. The checkpoint data size is the delta size for current checkpoint[1],
> assume you using incremental checkpoint
>
> 3. In fig1 the checkpoint size is ~3G, but in fig 2 the delta size can
> grow to ~ 15G, my gut feeling is that the state update/insert ratio for
> your program is very high? so that in one checkpoint you'll generate too
> much sst files
>
> 4. from fig 2 seems you configurate
> execution-checkpointing-max-concurrent-checkpoints[2] bigger than 1, could
> you please try to set it to 1 and have a try?
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/checkpoint_monitoring.html#history-tab
> 
>
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#execution-checkpointing-max-concurrent-checkpoints
> 
>
> Best,
>
> Congxian
>
>
>
>
>
> Slotterback, Chris  于2020年5月30日周六 上午7:43写道:
>
> Hi there,
>
>
>
> We are trying to upgrade a flink app from using FsStateBackend to
> RocksDBStateBackend to reduce overhead memory requirements. When enabling
> rocks, we are seeing a drop in used heap memory as it increments to disk,
> but checkpoint durations have become inconsistent. Our data source has a
> stable rate of reports coming in parallelly across partitions. The state
> size doesn’t seem to correlate with the checkpoint duration from what I can
> see in metrics. we have tried tmpfs and swap on SSDs with high iops, but
> can’t get a good handle on what’s causing smaller state to take longer to
> checkpoint. Our checkpoint location is hdfs, and works well in our
> non-rocks cluster.
>
>
>
> Is ~100x checkpoint duration expected when going from fs to rocks state
> backend, and is checkpoint duration supposed to vary this much with a
> consistent data source normally?
>
>
>
> Chris
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Flink s3 streaming performance

2020-06-05 Thread Arvid Heise
Hi Venkata,

are the many small files intended or is it rather an issue of our commit on
checkpointing? If so then FLINK-11499 [1] should help you. Design is close
to done, unfortunately implementation will not make it into 1.11.

In any case, I'd look at the parameter fs.s3a.connection.maximum, as you
store both state and data on S3. I'd probably go with slot*3 or even higher.

Lastly, the way you output elements looks also a bit suspicious.
PrintStream is not known for great performance. I'm also surprised that it
works without manual flushing.

[1] https://issues.apache.org/jira/browse/FLINK-11499

On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke  wrote:

> I think S3 is a wrong storage backend for this volumes of small messages.
> Try to use a NoSQL database or write multiple messages into one file in S3
> (1 or 10)
>
> If you still want to go with your scenario then try a network optimized
> instance and use s3a in Flink and configure s3 entropy.
>
> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <
> vkollur...@gmail.com>:
>
> 
> Hi David,
>
> The avg size of each file is around 30KB and I have checkpoint interval of
> 5 minutes. Some files are even 1 kb, because of checkpoint some files are
> merged into 1 big file around 300MB.
>
> With 120 million files and 4Tb, if the rate of transfer is 300 per minute,
> it is taking weeks to write to s3.
>
> I have tried to increase parallelism of sink but I dont see any
> improvement.
>
> The sink record is Tuple3, the actual content of
> file is f2. This is content is written to /f0/f1/part*-*
>
> I guess the prefix determination in custombucketassigner wont be causing
> this delay?
>
> Could you please shed some light on writing custom s3 sink ?
>
> Thanks
>
>
> On Sun, May 31, 2020, 6:34 AM David Magalhães 
> wrote:
>
>> Hi Venkata.
>>
>> 300 requests per minute look like a 200ms per request, which should be a
>> normal response time to send a file if there isn't any speed limitation
>> (how big are the files?).
>>
>> Have you changed the parallelization to be higher than 1? I also
>> recommend to limit the source parallelization, because it can consume
>> pretty fast from Kafka and create some kind of backpressure.
>>
>> I don't any much experience with StreamingFileSink, because I've ended up
>> using a custom S3Sink, but I did have some issues writing to S3 because the
>> request wasn't parallelised. Check this thread,
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070
>>
>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
>> vkollur...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have posted the same in stackoverflow but didnt get any response. So
>>> posting it here for help.
>>>
>>>
>>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787
>>>
>>> Details:
>>>
>>> I am working on a flink application on kubernetes(eks) which consumes
>>> data from kafka and write it to s3.
>>>
>>> We have around 120 million xml messages of size 4TB in kafka. Consuming
>>> from kafka is super fast.
>>>
>>> These are just string messages from kafka.
>>>
>>> There is a high back pressure while writing to s3. We are not even
>>> hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am
>>> seeing only 300 writes per minute to S3 which is very slow.
>>>
>>> I am using StreamFileSink to write to s3 with Rolling policy as
>>> OnCheckpointPolicy.
>>>
>>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or
>>> s3p)
>>>
>>> Other than this I dont have any config related to s3
>>>
>>> StreamingFileSink> sink = 
>>> StreamingFileSink
>>> .forRowFormat(new Path(s3://BUCKET),
>>> (Tuple3 element, OutputStream 
>>> stream) -> {
>>> PrintStream out = new PrintStream(stream);
>>> out.println(element.f2);
>>> })
>>> // Determine component type for each record
>>> .withBucketAssigner(new CustomBucketAssigner())
>>> .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>> .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
>>> .build();
>>>
>>> Is there anything that we can optimize on s3 from streamfilesink or in
>>> flink-conf.xml ?
>>>
>>> Like using bulkformat or any config params like fs.s3.maxThreads etc.
>>>
>>> For checkpointing too I am using s3:// instead of s3p or s3a
>>>
>>> env.setStateBackend((StateBackend) new 
>>> RocksDBStateBackend(s3://checkpoint_bucket, true));
>>> env.enableCheckpointing(30);
>>>
>>>
>>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

2020-06-05 Thread Arvid Heise
A common approach is to use a dead letter queue, which is an extra output
for bad input.

So the result of the read operation would look like Tuple2
(or use Either in scala) and return the parsed TBase on success or else put
in the invalid record byte[].

Then in your DAG, split the handling of the input:

DataStream> input = ...;
DataStream goodInput = input.filter(recordOrRaw -> recordOrRaw.v1 !=
null).map(recordOrRaw -> recordOrRaw.v1);
// continue normal processing with goodInput
DataStream badInput = input.filter(recordOrRaw -> recordOrRaw.v2 !=
null).map(recordOrRaw -> recordOrRaw.v2);
badInput.write... // for example to Kafka

Then you simply need to monitor the output of badInput. You can also easily
check why they have not been able to be parsed and actually try to develop
some recovery logic if possible.

On Mon, Jun 1, 2020 at 9:33 AM Yu Yang  wrote:

> Thanks for the suggestion, Yun!
>
> On Sun, May 31, 2020 at 11:15 PM Yun Gao  wrote:
>
>> Hi Yu,
>>
>> I think when the serializer returns *null, *the following operator
>> should still receive a record of null. A possible thought is that the
>> following operator may couting the number of null records received and use
>> a metric to publish the value to a monitor system, and the monitor system
>> promethus, and the monitor system should be able to configure alert
>> conditions.
>>
>> If *null* has problems, a special indicating object instance may be
>> created like NULL_TBASE, and the operator should be able to count the
>> number of NULL_TBASE received.
>>
>> Best,
>>  Yun
>>
>>
>> --Original Mail --
>> *Sender:*Yu Yang 
>> *Send Date:*Mon Jun 1 06:37:35 2020
>> *Recipients:*user 
>> *Subject:*best practice for handling corrupted records / exceptions in
>> custom DefaultKryoSerializer?
>>
>>> Hi all,
>>>
>>> To deal with corrupted messages that can leak into the data source once
>>> in a while, we implement a custom DefaultKryoSerializer class as below that
>>> catches exceptions. The custom serializer returns null in read(...) method
>>> when it encounters exception in reading. With this implementation, the
>>> serializer may silently drop records.  One concern is that it may drop too
>>> many records before we notice and take actions. What is the best practice
>>> to handle this?
>>>
>>> The serializer processes one record at a time. Will reading a corrupted
>>> record make the serialize fail to process the next valid record?
>>>
>>> public class CustomTBaseSerializer extends TBaseSerializer {
>>>  private static final Logger LOG = LoggerFactory.getLogger
>>> (CustomTBaseSerializer.class);
>>>  @Override
>>>  public void write(Kryo kryo, Output output, TBase tBase) {
>>>  try {
>>>  super.write(kryo, output, tBase);
>>> } catch (Throwable t) {
>>>  LOG.error("Failed to write due to unexpected Throwable", t)
>>> ;
>>> }
>>> }
>>>
>>>  @Override
>>>  public TBase read(Kryo kryo, Input input, Class tBaseClass)
>>> {
>>>  try {
>>>  return super.read(kryo, input, tBaseClass);
>>> } catch (Throwable t) {
>>>  LOG.error("Failed to read from input due to unexpected
>>> Throwable", t);
>>>  return null;
>>> }
>>>  }
>>>   }
>>>
>>> Thank you!
>>>
>>> Regards,
>>> -Yu
>>>
>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Native K8S not creating TMs

2020-06-05 Thread kb
Thanks Yang for the suggestion, I have tried it and I'm still getting the
same exception. Is it possible its due to the null pod name? Operation:
[create]  for kind: [Pod]  with name: [null]  in namespace: [default] 
failed.

Best,
kevin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink sql 中值为null时结果都为 false

2020-06-05 Thread Benchao Li
Hi,

我又想了一下这个问题,我认为这个行为的确是有点不太合理,我建了一个issue[1] 来跟踪这个事情。

[1] https://issues.apache.org/jira/browse/FLINK-18164

whirly  于2020年6月5日周五 下午11:20写道:

> 好的,可能只是我的邮件客户端显示的问题,感谢回复。
>
> 关于多加一个 is not null的判断的问题,po和我都觉得有些多此一举的感觉,而且有时候条件里字段很多,每个字段之前都需要加一个 is not
> null,难度也很大,且容易出错。
>
> 如果能有一个配置项控制 null <> 'someValue' 结果为true就好了
>
>
>
>
> whirly
> 邮箱:whir...@163.com
>
> 
>
> 签名由 网易邮箱大师  定制
>
> 在2020年06月05日 23:08,Benchao Li  写道:
> Hi,
>
> 我这边只收到一封你第一次发的邮件,看起来应该是没有问题。(不知道是不是你邮箱客户端本地显示的问题)
> 关于你的问题,现在的确是这样子处理的。我想问一下,多加一个IS NOT NULL有什么问题么?
>
> whirly  于2020年6月5日周五 下午9:54写道:
>
> > 不好意思,刚刚发现163邮箱自动重发了好几次这个提问,不知道怎么回事,可能是邮箱bug? 实在打扰了,而且现在不知道是不是还会自动重发
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-06-05 14:25:10,"whirly"  写道:
> > >大家好:
> > >在 flink sql 中,如 SELECT * from order where  product <>
> 'rubber',如果数据中的
> > product 字段值是 null, 是无法匹配 product <> 'rubber' 这个条件的,虽然 null 确实不等于
> 'rubber'
> > >只有将条件改为  where product is Null or product <> 'rubber' 才能匹配。
> > >但是我想要 null <> 'rubber' 的结果也为 True,且不想在条件之前加上 product is Null
> > 的判断,可以怎么办呢?
> > >
> > >
> > >感谢
> >
>
>
> --
>
> Best,
> Benchao Li
>
>

-- 

Best,
Benchao Li


Re: Auto adjusting watermarks?

2020-06-05 Thread Arvid Heise
Hi Theo,

The general idea is interesting. I'd probably start with some initial out
of boundness, and after collecting X elements, switch to the histogram. It
sounds very valid to snapshot it. I'd probably use a union state to also
support rescaling in a meaningful way.

However, tbh for a production use case, I'd probably go with a bit more
deterministic (=simpler) approach, and tweak it accordingly manually. If
late events are something to worry about, I'd have a metric on it anyways
with alerts and tweak the job accordingly.

Another dimension to look at, if you can actually recalculate results based
on the late events, so you actually get the best of two worlds (low initial
latency, but precise end results). I'd recommend also having a look at the
retract streams of Table API.

On Sat, May 30, 2020 at 10:04 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi Congxian,
>
> Thank's for your feedback. You raised a point I also already thought
> about. As "assignTimestampsAndWatermarks" creates an operator extending the
> standard AbstractUdfStreamOperator, I can also implement a RichFunction
> watermark assigner with full state access. In my case, I was also wondering
> whether it's a good idea to have a stateful watermark assigner or whether
> its more practical to have no state on start and build my histogram over
> time, new with each job restart... That's also why I asked on the mailing
> list so I can feedback of other people customizing the watermark assigners.
>
> Best regards
> Theo
>
> --
> *Von: *"Congxian Qiu" 
> *An: *"Theo Diefenthal" 
> *CC: *"user" 
> *Gesendet: *Samstag, 30. Mai 2020 05:06:12
> *Betreff: *Re: Auto adjusting watermarks?
>
> Hi
> Could it be store a histogram data in custom 
> `BoundedOutOfOrdernessTimestampExtractor`
> and adjust the `maxOutOfOrderness` according to the histogram data ok for
> you case? (be careful, such histogram data would not snapshot out when
> checkpointing)
>
> Best,
> Congxian
>
>
> Theo Diefenthal  于2020年5月30日周六
> 上午4:35写道:
>
>> Hi there,
>>
>> Currently I have a job pipeline reading data from > 10 different kind of
>> sources with each having different out-of-orderness characteristics. I am
>> currently working on adjusting the watermarks for each source "properly". I
>> work with BoundedOutOfOrdernessTimestampExtractor and, as usual, I want the
>> maxOutOfOrderness as low as possible while still keeping as much elements
>> as possible in time as late arrivals trigger rather expensive computations.
>>
>> Now I thought, what I probably want is something like "I want to have
>> about 99.9% of my elements within the allowed lateness". Of course, I don't
>> know the future events out-of-orderness, but I can predict it from the
>> past, e.g. via a histogram with a 99.9% percentile, and adjust the
>> maxOutOfOrdernesss dynamically.
>>
>> As Flink provides rather simplified Timestamp Assigner only but allows me
>> to create my own ones with arbitrary complexity, I was wondering if
>> somebody of you already did something like that, if that's a viable
>> approach and I'm on a good track here?
>>
>> Best regards
>> Theo
>>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Run command after Batch is finished

2020-06-05 Thread Jeff Zhang
You can try JobListener which you can register to ExecutionEnvironment.

https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java


Mark Davis  于2020年6月6日周六 上午12:00写道:

> Hi there,
>
> I am running a Batch job with several outputs.
> Is there a way to run some code(e.g. release a distributed lock) after all
> outputs are finished?
>
> Currently I do this in a try-finally block around
> ExecutionEnvironment.execute() call, but I have to switch to the detached
> execution mode - in this mode the finally block is never run.
>
> Thank you!
>
>   Mark
>


-- 
Best Regards

Jeff Zhang


Run command after Batch is finished

2020-06-05 Thread Mark Davis
Hi there,

I am running a Batch job with several outputs.
Is there a way to run some code(e.g. release a distributed lock) after all 
outputs are finished?

Currently I do this in a try-finally block around 
ExecutionEnvironment.execute() call, but I have to switch to the detached 
execution mode - in this mode the finally block is never run.

Thank you!

  Mark

Re: Re: Flink SQL使用Kafka自带的timestamp作为事件时间

2020-06-05 Thread Jark Wu
据我所知  `timestampFromSource` 目前没有 connector 支持。。。

On Fri, 5 Jun 2020 at 22:29, sunfulin  wrote:

>
>
>
> 谢谢Jark老大的回复。看起来在属性里增加   'timestamp.field' = 'timestamp'
> 应该是我需要的。我注意到目前通过Java代码可以获取timestampFromSource,这个功能是可以拿到source的timestamp么?我测试了下貌似解析出来为空。我的Kafka版本是0.10.2。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-05 19:31:37,"Jark Wu"  写道:
> >访问 Kafka 消息上的所有数据(timestamp, partition, key, 等等)是一个非常重要的功能,社区也很早就意识到了。
> >目前已经有一个 FLIP [1] 在讨论中,预计 1.12 会支持。
> >
> >Best,
> >Jark
> >
> >[1]:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> >
> >On Fri, 5 Jun 2020 at 19:19, sunfulin  wrote:
> >
> >> Hi,
> >> 想问下Flink SQL在使用DDL创建Kafka
> >>
> Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性.
> >> 如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错:
> >>
> >>
> >> CREATE TABLE user_behavior (
> >> test_time TIMESTAMP(3),
> >> user_id STRING ,
> >> item_id STRING ,
> >> category_id STRING ,
> >> behavior STRING,
> >> ts STRING,
> >> proctime as PROCTIME() -- 通过计算列产生一个处理时间列
> >> ) WITH (
> >> 'connector.type' = 'kafka', -- 使用 kafka connector
> >> 'connector.version' = '0.10', -- kafka 版本,universal 支持 0.11 以上的版本
> >> 'connector.topic' = 'test', -- kafka topic
> >> 'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
> >> --'connector.properties.group.id' = 'mytest',
> >> 'connector.properties.zookeeper.connect' = '168.61.113.170:2181', --
> >> zookeeper 地址
> >> 'connector.properties.bootstrap.servers' = '168.61.113.170:9092', --
> >> kafka broker 地址
> >> 'format.type' = 'json' -- 数据源格式为 json
> >> ,'schema.0.rowtime.timestamps.type' = 'from-source',
> >> 'schema.0.rowtime.watermarks.type' = 'periodic-ascending',
> >> 'schema.0.rowtime.watermarks.delay' = '5000'
> >> )
> >>
> >>
> >>
> >>
> >> 异常为:
> >>
> >>
> >>  java.lang.UnsupportedOperationException: empty.max
> >>  at scala.collection.TraversableOnce.max(TraversableOnce.scala:228)
> >>  at scala.collection.TraversableOnce.max$(TraversableOnce.scala:226)
> >>  at scala.collection.mutable.ArrayOps$ofInt.max(ArrayOps.scala:242)
> >>  at
> >>
> org.apache.flink.table.planner.sources.TableSourceUtil$.createSchemaRelNode(TableSourceUtil.scala:310)
> >>  at
> >>
> org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:297)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.$anonfun$translateToPlanInternal$1(StreamExecTableSourceScan.scala:130)
> >>  at scala.Option.map(Option.scala:146)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:125)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
> >>  at org.apache.flink.table.planner.plan.no
>


Re: flink sql 中值为null时结果都为 false

2020-06-05 Thread Benchao Li
Hi,

我这边只收到一封你第一次发的邮件,看起来应该是没有问题。(不知道是不是你邮箱客户端本地显示的问题)
关于你的问题,现在的确是这样子处理的。我想问一下,多加一个IS NOT NULL有什么问题么?

whirly  于2020年6月5日周五 下午9:54写道:

> 不好意思,刚刚发现163邮箱自动重发了好几次这个提问,不知道怎么回事,可能是邮箱bug? 实在打扰了,而且现在不知道是不是还会自动重发
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-05 14:25:10,"whirly"  写道:
> >大家好:
> >在 flink sql 中,如 SELECT * from order where  product <> 'rubber',如果数据中的
> product 字段值是 null, 是无法匹配 product <> 'rubber' 这个条件的,虽然 null 确实不等于 'rubber'
> >只有将条件改为  where product is Null or product <> 'rubber' 才能匹配。
> >但是我想要 null <> 'rubber' 的结果也为 True,且不想在条件之前加上 product is Null
> 的判断,可以怎么办呢?
> >
> >
> >感谢
>


-- 

Best,
Benchao Li


Re:Re: Flink SQL使用Kafka自带的timestamp作为事件时间

2020-06-05 Thread sunfulin



谢谢Jark老大的回复。看起来在属性里增加   'timestamp.field' = 'timestamp'  
应该是我需要的。我注意到目前通过Java代码可以获取timestampFromSource,这个功能是可以拿到source的timestamp么?我测试了下貌似解析出来为空。我的Kafka版本是0.10.2。














在 2020-06-05 19:31:37,"Jark Wu"  写道:
>访问 Kafka 消息上的所有数据(timestamp, partition, key, 等等)是一个非常重要的功能,社区也很早就意识到了。
>目前已经有一个 FLIP [1] 在讨论中,预计 1.12 会支持。
>
>Best,
>Jark
>
>[1]:
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
>
>On Fri, 5 Jun 2020 at 19:19, sunfulin  wrote:
>
>> Hi,
>> 想问下Flink SQL在使用DDL创建Kafka
>> Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性.
>> 如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错:
>>
>>
>> CREATE TABLE user_behavior (
>> test_time TIMESTAMP(3),
>> user_id STRING ,
>> item_id STRING ,
>> category_id STRING ,
>> behavior STRING,
>> ts STRING,
>> proctime as PROCTIME() -- 通过计算列产生一个处理时间列
>> ) WITH (
>> 'connector.type' = 'kafka', -- 使用 kafka connector
>> 'connector.version' = '0.10', -- kafka 版本,universal 支持 0.11 以上的版本
>> 'connector.topic' = 'test', -- kafka topic
>> 'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
>> --'connector.properties.group.id' = 'mytest',
>> 'connector.properties.zookeeper.connect' = '168.61.113.170:2181', --
>> zookeeper 地址
>> 'connector.properties.bootstrap.servers' = '168.61.113.170:9092', --
>> kafka broker 地址
>> 'format.type' = 'json' -- 数据源格式为 json
>> ,'schema.0.rowtime.timestamps.type' = 'from-source',
>> 'schema.0.rowtime.watermarks.type' = 'periodic-ascending',
>> 'schema.0.rowtime.watermarks.delay' = '5000'
>> )
>>
>>
>>
>>
>> 异常为:
>>
>>
>>  java.lang.UnsupportedOperationException: empty.max
>>  at scala.collection.TraversableOnce.max(TraversableOnce.scala:228)
>>  at scala.collection.TraversableOnce.max$(TraversableOnce.scala:226)
>>  at scala.collection.mutable.ArrayOps$ofInt.max(ArrayOps.scala:242)
>>  at
>> org.apache.flink.table.planner.sources.TableSourceUtil$.createSchemaRelNode(TableSourceUtil.scala:310)
>>  at
>> org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:297)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.$anonfun$translateToPlanInternal$1(StreamExecTableSourceScan.scala:130)
>>  at scala.Option.map(Option.scala:146)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:125)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>  at org.apache.flink.table.planner.plan.no


Re:flink sql 中值为null时结果都为 false

2020-06-05 Thread whirly
不好意思,刚刚发现163邮箱自动重发了好几次这个提问,不知道怎么回事,可能是邮箱bug? 实在打扰了,而且现在不知道是不是还会自动重发 














在 2020-06-05 14:25:10,"whirly"  写道:
>大家好:
>在 flink sql 中,如 SELECT * from order where  product <> 'rubber',如果数据中的 
> product 字段值是 null, 是无法匹配 product <> 'rubber' 这个条件的,虽然 null 确实不等于 'rubber'
>只有将条件改为  where product is Null or product <> 'rubber' 才能匹配。
>但是我想要 null <> 'rubber' 的结果也为 True,且不想在条件之前加上 product is Null 的判断,可以怎么办呢?
>
>
>感谢


Re: Creating TableSchema from the Avro Schema

2020-06-05 Thread Dawid Wysakowicz
First of all to give a back story for the deprecation we do not want to
depend on the TypeInformation anymore for the types in Table API as it
binds both the on-wire representation with the logical types of the SQL
API. The goal is to use the DataType exclusively in the Table API
(including for the Schema). Moreover we rather aim to make the SQL types
the source of truth. That's why its usually the TableSchema that's being
translated into the Avro schema rather than in the other direction.
Nevertheless I do see a use for deriving a DDL statement from avro
schema. I created a ticket for
it:https://issues.apache.org/jira/browse/FLINK-18158

What you could do now is to leverage the
org.apache.flink.formats.avro.typeutils.AvroSchemaConverter#convertToSchema(org.apache.flink.table.types.logical.LogicalType)
from the master to write the reverse operation from Schema to DataType.
Just as an additional comment the class AvroSchemaConverter is an
internal class without any guarantees for the stability of the methods.

Best

Dawid



On 04/06/2020 14:53, Ramana Uppala wrote:
> Hi,
>
> In Flink 1.9, we have option to create the TableSchema form TypeInformation. 
> We have used below.
>
> TypeInformation typeInfo = AvroSchemaConverter.convertToTypeInfo(schema);
>   TableSchema tableSchema = TableSchema.fromTypeInfo(typeInfo);
>
> However TableSchema's fromTypeInfo method is deprecated in Flink 1.10. Is 
> there any other utility to create TableSchema from Avro schema in Flink 1.0 ?



signature.asc
Description: OpenPGP digital signature


Re: [External Sender] Re: Flink sql nested elements

2020-06-05 Thread Ramana Uppala
Hi Leonard,

We are using Flink 1.10 version and I can not share the complete schema but
it looks like below in Hive Catalog,

flink.generic.table.schema.1.data-type ROW<`col1` VARCHAR(2147483647),
`postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>>

Based on the stack trace, sqlUpdate API validates the sql statement and
throwing the above error.  Do we need to configure any Calcite
configuration to support nested types ?

Thanks,
Ramana.

On Fri, Jun 5, 2020 at 12:49 AM Leonard Xu  wrote:

> Hi,Ramana
>
> For nested data type, Flink use dot (eg a.b.c) to visit nested elements.
> Your SQL syntax looks right, which Flink version are you using? And could
> you post your Avro Schema file and DDL ?
>
> Best,
> Leonard Xu
>
> > 在 2020年6月5日,03:34,Ramana Uppala  写道:
> >
> > We have Avro schema that contains nested structure and when querying
> using Flink SQL, we are getting below error.
> >
> > Exception in thread "main" java.lang.AssertionError
> >   at
> org.apache.calcite.sql.parser.SqlParserPos.sum_(SqlParserPos.java:236)
> >   at
> org.apache.calcite.sql.parser.SqlParserPos.sum(SqlParserPos.java:226)
> >   at
> org.apache.calcite.sql.SqlIdentifier.getComponent(SqlIdentifier.java:232)
> >   at
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:416)
> >   at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5733)
> >   at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5718)
> >   at
> org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
> >
> > Example Schema:
> > ROW<`col1` VARCHAR(2147483647), `postalAddress` ROW<`addressLine1`
> VARCHAR(2147483647), `addressLine2` VARCHAR(2147483647), `addressLine3`
> VARCHAR(2147483647)>>
> >
> > Example SQL:
> > insert into CSVSink
> > select
> > col1,
> > postalAddress.addressLine1 as address
> > from myStream
> >
> > In Flink SQL, How to select nested elements ?
> >
>
>

__



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.





Re: [External Sender] Re: Avro Arrat type validation error

2020-06-05 Thread Ramana Uppala
Hi Dawid,

We are using a custom connector that is very similar to Flink Kafka
Connector and  instantiating TableSchema using a custom class which maps
Avro types to Flink's DataTypes using TableSchema.Builder.

For Array type, we have below mapping:

 case ARRAY:
return
DataTypes.ARRAY(toFlinkType(schema.getElementType()));


We are using Hive Catalog and creating tables using CatalogTableImpl with
TableSchema.

As you mentioned, if we create TableSchema with legacy types, our
connectors works without any issues. But, we want to use the new Flink
DataTypes API but having issues.

Also, one more observation is if we use legacy types in TableSource
creation, application not working using Blink Planner. We are getting the
same error physical type not matching.

Looking forward to the 1.11 changes.


On Fri, Jun 5, 2020 at 3:34 AM Dawid Wysakowicz 
wrote:

> Hi Ramana,
>
> What connector do you use or how do you instantiate the TableSource?
> Also which catalog do you use and how do you register your table in that
> catalog?
>
> The problem is that conversion from TypeInformation to DataType produces
> legacy types (because they cannot be mapped exactyl 1-1 to the new types).
>
> If you can change the code of the TableSource you can return in the
> TableSource#getProducedType the tableSchema.toRowDataType, where the
> tableSchema is the schema coming from catalog. Or you can make sure that
> the catalog table produces the legacy type:
>
> TableSchema.field("field", Types.OBJECT_ARRAY(Types.STRING));
>
> In 1.11 we will introduce new sources and formats already working
> entirely with the new type system (AvroRowDataDeserializationSchema and
> KafkaDynamicTable).
>
> Hope this helps a bit.
>
> Best,
>
> Dawid
>
> On 04/06/2020 13:43, Ramana Uppala wrote:
> > Hi,
> > Avro schema contains Array type and we created TableSchema out
> of the AvroSchema and created a table in catalog. In the catalog, this
> specific filed type shown as ARRAY. We are using
> AvroRowDeserializationSchema with the connector and returnType of
> TableSource showing Array mapped to LEGACY('ARRAY',
> 'ANY<[Ljava.lang.String;, by AvroSchemaConverter
> >
> > when we are running the application, planner validating physical types
> and logical types and we are getting below error.
> >
> > of table field 'XYZ' does not match with the physical type ROW<
> >
> > Any suggestions on how to resolve this ? is this a bug ?
>
>

__



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.





Re: Flink SQL使用Kafka自带的timestamp作为事件时间

2020-06-05 Thread Jark Wu
访问 Kafka 消息上的所有数据(timestamp, partition, key, 等等)是一个非常重要的功能,社区也很早就意识到了。
目前已经有一个 FLIP [1] 在讨论中,预计 1.12 会支持。

Best,
Jark

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records

On Fri, 5 Jun 2020 at 19:19, sunfulin  wrote:

> Hi,
> 想问下Flink SQL在使用DDL创建Kafka
> Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性.
> 如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错:
>
>
> CREATE TABLE user_behavior (
> test_time TIMESTAMP(3),
> user_id STRING ,
> item_id STRING ,
> category_id STRING ,
> behavior STRING,
> ts STRING,
> proctime as PROCTIME() -- 通过计算列产生一个处理时间列
> ) WITH (
> 'connector.type' = 'kafka', -- 使用 kafka connector
> 'connector.version' = '0.10', -- kafka 版本,universal 支持 0.11 以上的版本
> 'connector.topic' = 'test', -- kafka topic
> 'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
> --'connector.properties.group.id' = 'mytest',
> 'connector.properties.zookeeper.connect' = '168.61.113.170:2181', --
> zookeeper 地址
> 'connector.properties.bootstrap.servers' = '168.61.113.170:9092', --
> kafka broker 地址
> 'format.type' = 'json' -- 数据源格式为 json
> ,'schema.0.rowtime.timestamps.type' = 'from-source',
> 'schema.0.rowtime.watermarks.type' = 'periodic-ascending',
> 'schema.0.rowtime.watermarks.delay' = '5000'
> )
>
>
>
>
> 异常为:
>
>
>  java.lang.UnsupportedOperationException: empty.max
>  at scala.collection.TraversableOnce.max(TraversableOnce.scala:228)
>  at scala.collection.TraversableOnce.max$(TraversableOnce.scala:226)
>  at scala.collection.mutable.ArrayOps$ofInt.max(ArrayOps.scala:242)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$.createSchemaRelNode(TableSourceUtil.scala:310)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:297)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.$anonfun$translateToPlanInternal$1(StreamExecTableSourceScan.scala:130)
>  at scala.Option.map(Option.scala:146)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:125)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>  at org.apache.flink.table.planner.plan.no


Flink SQL使用Kafka自带的timestamp作为事件时间

2020-06-05 Thread sunfulin
Hi,
想问下Flink SQL在使用DDL创建Kafka 
Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性.
如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错:


CREATE TABLE user_behavior (
test_time TIMESTAMP(3),
user_id STRING ,
item_id STRING ,
category_id STRING ,
behavior STRING,
ts STRING,
proctime as PROCTIME() -- 通过计算列产生一个处理时间列
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = '0.10', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'test', -- kafka topic
'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
--'connector.properties.group.id' = 'mytest',
'connector.properties.zookeeper.connect' = '168.61.113.170:2181', -- zookeeper 
地址
'connector.properties.bootstrap.servers' = '168.61.113.170:9092', -- kafka 
broker 地址
'format.type' = 'json' -- 数据源格式为 json
,'schema.0.rowtime.timestamps.type' = 'from-source',
'schema.0.rowtime.watermarks.type' = 'periodic-ascending',
'schema.0.rowtime.watermarks.delay' = '5000'
)




异常为:


 java.lang.UnsupportedOperationException: empty.max
 at scala.collection.TraversableOnce.max(TraversableOnce.scala:228)
 at scala.collection.TraversableOnce.max$(TraversableOnce.scala:226)
 at scala.collection.mutable.ArrayOps$ofInt.max(ArrayOps.scala:242)
 at 
org.apache.flink.table.planner.sources.TableSourceUtil$.createSchemaRelNode(TableSourceUtil.scala:310)
 at 
org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:297)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.$anonfun$translateToPlanInternal$1(StreamExecTableSourceScan.scala:130)
 at scala.Option.map(Option.scala:146)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:125)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
 at org.apache.flink.table.planner.plan.no

UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Satyam Shekhar
Hello,

I am using Flink as the query engine to build an alerting/monitoring
application. One of the use cases in our product requires continuously
tracking and charting the output of an aggregate only SQL query,
for example, select sum(revenue) from lineorder. A desirable property from
the output of Flink job for such a query is that there is always exactly 1
row in the result set (or that the number of rows does not fall to 0 due to
retractions for previous output).  In other words, I need upsert "like"
semantics for the output of the query.

I was hopeful after reading comments in UpsertStreamTableSink.java that
this condition is accounted for in the implementation, however, a pipeline
with above query writing to a concrete UpsertStreamTableSink fails with the
following error  - "UpsertStreamTableSink requires that Table has" + " a
full primary keys if it is updated." Here are the relevant comments from
UpsertStreamTableSink.java for reference -

```
Configures the unique key fields of the {@link Table} to write. The method
is called after {@link TableSink#configure(String[], TypeInformation[])}.

The keys array might be empty, if the table consists of a single
(updated) record. If the table does not have a key and is append-only, the
keys attribute is null.

@param keys the field names of the table's keys, an empty array if the
table has a single row, and null if the table is append-only and has no key.
void setKeyFields(String[] keys);
```

The code in StreamExec(Legacy)Sink.scala appears to conform to observed
failure and does not match the comment about "empty key array if the table
consists of a single record".

 With that context, I have the following questions -

1. Is the UpsertStreamTableSink expected to consume the output of such
aggregate only queries? Or is my interpretation of the code and comment
wrong and I have misconfigured UpsertStreamTableSink?
2. If the answer to (1) is no, are there any recommended patterns for
solving this use-case such that the client never observes an empty result
set for the output of this query?

Regards,
Satyam


Re: 回复: flink整合hbase

2020-06-05 Thread xueaohui_...@163.com

是有原生的,另外一个老哥也回答了,
你用java直接连接是不是也不ok,?



xueaohui_...@163.com
 
发件人: liunaihua521
发送时间: 2020-06-05 09:31
收件人: user-zh@flink.apache.org
抄送: user-zh@flink.apache.org
主题: 回复:回复: flink整合hbase
hi,
sink是自己的,我还没发现flink有原生的hbase的sink,正在研究.
 
 
在2020年6月5日 09:06,xueaohui_...@163.com 写道:
Sink 是自己的还是官方的。
 
 
 
xueaohui_...@163.com
 
发件人: liunaihua521
发送时间: 2020-06-05 00:27
收件人: user-zh@flink.apache.org
主题: flink整合hbase
hi
咨询下,想用flink1.10整合hbase,请问一下hadoop,hbase都需要什么版本?有哪位跑通过,可否提供下配置文件,我在本地测试,所有集群都正常启动,提交任务(flink读写hbase)总是报错,找不到某些类,是否跟版本有问题?
提前谢谢了!
 
| |
liunaihua521
|
|
邮箱:liunaihua...@163.com
|
 
签名由 网易邮箱大师 定制


Re: flink1.9 Sql 注册的中间临时表不自动存state的吗?

2020-06-05 Thread Benchao Li
Hi,

看你的问题描述,我们可能遇到过类似的问题。

我们的问题是,从cp恢复之后,某些key跟之前的状态里的key对不上了,所以就有点类似于丢失了一部分状态。

但是我们也没有查出来具体的原因,一方面是因为问题比较难以复现,我们用线上数据,也只是有部分数据有问题,
也看不出来这部分有问题的数据有什么规律;另一方面是blink planner底层用的都是binary的数据结构,debug起来也会
比较困难。

如果你能提供一个比较稳定的能复现的数据集和测试方法,我觉得这个问题我们可以再推进解决一下。

star <3149768...@qq.com> 于2020年6月5日周五 下午4:02写道:

> 各位大佬有遇到过类似问题吗?
>
>
>
>
> --原始邮件--
> 发件人:"star"<3149768...@qq.com;
> 发送时间:2020年6月5日(星期五) 上午10:40
> 收件人:"user-zh@flink.apache.org"
> 主题:回复:flink1.9 Sql 注册的中间临时表不自动存state的吗?
>
>
>
> 没有使用窗口,状态不该被清理,但从结果看貌似被清理过呀
>
>
>
>
> -- 原始邮件 --
> 发件人:"zhiyezou"<1530130...@qq.com;
> 发送时间:2020年6月5日(星期五) 上午10:31
> 收件人:"user-zh"
> 主题:回复:flink1.9 Sql 注册的中间临时表不自动存state的吗?
>
>
>
> Hi
>
>
> 如果没有使用窗口的话,状态是不会被自动清理掉的,需要自己主动配置状态的TTL
>
>
>
>
> --nbsp;原始邮件nbsp;--
> 发件人:nbsp;"star"<3149768...@qq.comgt;;
> 发送时间:nbsp;2020年6月5日(星期五) 上午10:22
> 收件人:nbsp;"user-zh@flink.apache.org"
> 主题:flink1.9 Sql 注册的中间临时表不自动存state的吗?
>
>
>
> 大家好,我使用的flink 1.9的blink planner
>
>
>
> 先按 月 和城市 去重id; 注册成一张临时表(月表) 将结果实时输出到hbase表:monthtable;rowkey是month+city
>
>
>
>
> 然后基于上面这张表 再按city汇总 输出到hbase表:totalTable ,rowkey是 city
>
>
>
>
>
>
> 运行了近18个小时 中间有过restore和ck失败,然后我统计了amp;nbsp;
> 两个表到cnt汇总值居然不一样,totalTable比monthtable小,而且还差了不了,请问这会是上面原因
>
>
> 伪代码如下:
>
>
>
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> val bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val tableEnv = StreamTableEnvironment.create(env, bsSettings)
>
>
> myDataStream=..
>
>
>
>
> tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id)
>
>
>
>
> //按月份,城市统计去重的id数量(id和city有关,同一个城市的id不会重复)
> val monthCount = tableEnv.sqlQuery(
> s"""
> select month,city,count(distinct id) as cnt from monthtable group by
> month,city
> """.stripMargin)
>
>
> //将月统计结果输出到hbase,rokey为month+city
> monthCount.toRetractStream[Row].filter(_._1).map(line=amp;gt;{
> val row=line._2
> val month=row.getField(0).toString
> val city=row.getField(1).toString
> val cnt=row.getField(2).toString
> val map=new util.HashMap[String,String]()
> map.put("cnt",cnt)
> (month+city,map)  // month+city是rowkey cnt是一个column
> }).addSink(new MyHbaseSink("monthHbaseTable")
>
>
>
>
>
>
> //将上面的月表注册成新表 monthStat
> tableEnv.registerTable("monthStat",monthCount)
>
>
> //按城市统计id的数量
> val totalCount = tableEnv.sqlQuery(
> s"""
> select city,sum(cnt) as cityCnt from monthStatamp;nbsp; group by city
> """.stripMargin)
>
>
> //将月统计结果输出到hbase,rokey为city
> totalCount.toRetractStream[Row].filter(_._1).map(line=amp;gt;{
> val row=line._2
> val city=row.getField(0).toString
> val totalCnt=row.getField(1).toString
> val map=new util.HashMap[String,String]()
> map.put("totalCnt",totalCnt)
> (city,map)
> }).addSink("totalHbaseTable")



-- 

Best,
Benchao Li


Re: FlinkKinesisProducer sends data to only 1 shard. Is it because I don't have "AggregationEnabled" set to false ?

2020-06-05 Thread Vijay Balakrishnan
Hi,
Resolved the issue by using a Custom Partitioner and setting RequestTimeout
properties.

kinesisProducer.setCustomPartitioner(new SerializableCustomPartitioner());

private static final class SerializableCustomPartitioner extends
KinesisPartitioner> {

private static final long serialVersionUID = -5196071893997035695L;

@Override
public String getPartitionId(Map map) {
StringBuilder stringBuilder = new StringBuilder();
UUID uuid = UUID.randomUUID();
stringBuilder.append(uuid);
return stringBuilder.toString();
}
}


On Thu, Jun 4, 2020 at 6:43 PM Vijay Balakrishnan 
wrote:

> Hi,
> Looks like I am sending a Map to Kinesis and it is being
> sent to 1 partition only. *How can I make this distribute across multiple
> partitions/shards on the Kinesis Data stream with this Map*
> data ?
>
> *Sending to Kinesis*:
> DataStream> influxToMapKinesisStream =
> enrichedMGStream.map(influxDBPoint -> {
> return new
> MonitoringGroupingToInfluxDBPoint(agg,
> groupBySetArr).fromInfluxDBPoint(influxDBPoint);
> }).returns(new TypeHint>()
> {
> }).setParallelism(dfltParallelism);
>
> FlinkKinesisProducer>
> kinesisProducer = getMonitoringFlinkKinesisProducer(kinesisTopicWrite,
> region, local, localKinesis);
>
> influxToMapKinesisStream.addSink(kinesisProducer).setParallelism(dfltParallelism);
>
> *Map used to send to Kinesis:*
>
> Map mapObj = new HashMap<>();
> mapObj.put(Utils.EVENT_TIMESTAMP, influxDBPoint.getTimestamp());
> mapObj.put(Utils.MEASUREMENT, influxDBPoint.getMeasurement());
> mapObj.put(Utils.TAGS, influxDBPoint.getTags());
> mapObj.put(Utils.FIELDS, influxDBPoint.getFields());
>
> TIA,
>
> On Thu, Jun 4, 2020 at 5:35 PM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> My FlinkKinesisProducer sends data to only 1 shard. Is it because I don't
>> have "AggregationEnabled" set to false ?
>>
>> flink_connector_kinesis_2.11 : flink version 1.9.1
>>
>> //Setup Kinesis Producer
>> Properties kinesisProducerConfig = new Properties();
>> kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION,
>> region);
>>
>> kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
>> "AUTO");
>> //kinesisProducerConfig.setProperty("AggregationEnabled",
>> "false");
>>
>> FlinkKinesisProducer> kinesisProducer = new
>> FlinkKinesisProducer<>(
>> new MonitoringMapKinesisSchema(localKinesis),
>> kinesisProducerConfig);
>>
>> //TODO: kinesisProducer.setFailOnError(true);
>> kinesisProducer.setDefaultStream(kinesisTopicWrite);
>> kinesisProducer.setDefaultPartition("0");//TODO: why from start ?
>> return kinesisProducer;
>>
>> TIA,
>>
>


Re: CheckPoint Dir 路径下引发的一些问题

2020-06-05 Thread Px New
感谢回复, 我明白了在状态恢复时具体细节,以及其他文件的产生及作用

Weihua Hu  于2020年6月5日周五 下午1:48写道:

> HI, Px New
>
> 1. Checkpoint 保存数量可以通过参数: state.checkpoints.num-retained 来控制,默认是 1
> 2. _metadata 只是一些元数据,保存了state 的句柄,其他文件是 state 数据,由各 Task 在触发 checkpoint
> 的时候上传。相反在恢复 checkpoint 的时候JM 读取_metadata 将相应句柄下发到 Task,Task 通过远端 HDFS 拉取对应的
> state。
>
>
> Best
> Weihua Hu
>
> > 2020年6月5日 13:36,Px New <15701181132mr@gmail.com> 写道:
> >
> > Hi everyOne 有一个关于CheckPoint相关的一个问题:
> > 1.我在项目中使用的状态后端为:Fsstatebackend
> > 2.我在jobManager的log输出找到相应的job ID后 去对应的HDFS 找到了对应的chk目录
> > 3.但我有两个疑问:
> > 3.1.没有设置 chk的存储数默认是多保留多少份呢(我这边看到保留了近20次的chk)?
> > 3.2 当我点进具体的chk-id 后 发现有很多文件[见2图] 我清楚的是当任务发生异常后tesk 会从hdfs 将_metadata
> 下载后进行任务恢复操作的,那其他的哪些文件是如何产生的?以及有什么作用呢?
> > 期待回复:
> >
> >
> >
>
>


Re: CheckPoint Dir 路径下引发的一些问题

2020-06-05 Thread Px New
哦 对此我很抱歉:
图1:  https://i.loli.net/2020/06/05/SAfpnkqlOUM9hD3.png
图2:
https://imgkr.cn-bj.ufileos.com/aed4cb64-dd24-4076-ba4c-a0e07bc356bf.png

zhiyezou <1530130...@qq.com> 于2020年6月5日周五 下午1:58写道:

> Hi
> 麻烦使用第三方图床,把图片链接过来,直接贴图片的话显示不出来
>
>
>
>
> --原始邮件--
> 发件人:"Weihua Hu" 发送时间:2020年6月5日(星期五) 中午1:48
> 收件人:"user-zh"
> 主题:Re: CheckPoint Dir 路径下引发的一些问题
>
>
>
> HI, Px New
>
> 1. Checkpoint 保存数量可以通过参数: state.checkpoints.num-retained 来控制,默认是 1
> 2. _metadata 只是一些元数据,保存了state 的句柄,其他文件是 state 数据,由各 Task 在触发 checkpoint
> 的时候上传。相反在恢复 checkpoint 的时候JM 读取_metadata 将相应句柄下发到 Task,Task 通过远端 HDFS 拉取对应的
> state。
>
>
> Best
> Weihua Hu
>
>  2020年6月5日 13:36,Px New <15701181132mr@gmail.com 写道:
> 
>  Hi everyOne 有一个关于CheckPoint相关的一个问题:
>  1.我在项目中使用的状态后端为:Fsstatebackend
>  2.我在jobManager的log输出找到相应的job ID后 去对应的HDFS 找到了对应的chk目录
>  3.但我有两个疑问:
>  3.1.没有设置 chk的存储数默认是多保留多少份呢(我这边看到保留了近20次的chk)?
>  3.2 当我点进具体的chk-id 后 发现有很多文件[见2图] 我清楚的是当任务发生异常后tesk 会从hdfs 将_metadata
> 下载后进行任务恢复操作的,那其他的哪些文件是如何产生的?以及有什么作用呢?
>  期待回复:
> 
> 
> 


FlinkKafkaProducer事务,transactionId问题反馈

2020-06-05 Thread 李杰
*一、场景说明:*

  flink作业逻辑:source(kakfa)-> data process (wordCount逻辑) -> sink (kafka)

  1、作业A:
 source_topic: word_count_topic
 sink_topc: result_01
 group_id: test-group01

  2、作业B:
 source_topic: word_count_topic
 sink_topc: result_02
 group_id: test-group02

3、两个作业使用的是同一个jar包,同一段代码,唯独group.id 和 sink_topic不同。

4、FlinkKafkaProducer 使用 EXACTLY_ONCE 语义,使用kafak事务向topic写入数据。

*现象:*
  从以下错误日志可以看出,transactionid相互干扰。

 *jobmanager端错误日志:*
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Producer attempted an operation with an old epoch.
Either there is a newer producer with the same transactionalId, or the
producer's transaction has been expired by the broker.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1227)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:741)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:90)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:231)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:64)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer
with the same transactionalId, or the producer's transaction has been
expired by the broker.

*takmanager端错误日志:*
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted
an operation with an old epoch. Either there is a newer producer with the
same transactionalId, or the producer's transaction has been expired by the
broker.

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Producer attempted an operation with an old epoch.
Either there is a newer producer with the same transactionalId, or the
producer's transaction has been expired by the broker.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1227)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:837)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:605)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:504)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer
with the same transactionalId, or the producer's transaction has been
expired by the broker.
2020-06-04 16:48:36,817 INFO  org.apache.flink.runtime.taskmanager.Task
- pend-name -> (Sink: pnt-name, Sink: sink-name) (1/1)
(db4ee0c44888e866b3d26d39b34a0bd8) switched from RUNNING to FAILED.



*二、初步分析*

*1、现象说明*
  

??????flink1.9 Sql ????????????????????????state??????

2020-06-05 Thread star





----
??:"star"<3149768...@qq.com;
:2020??6??5??(??) 10:40
??:"user-zh@flink.apache.org"

Re: Avro Arrat type validation error

2020-06-05 Thread Dawid Wysakowicz
Hi Ramana,

What connector do you use or how do you instantiate the TableSource?
Also which catalog do you use and how do you register your table in that
catalog?

The problem is that conversion from TypeInformation to DataType produces
legacy types (because they cannot be mapped exactyl 1-1 to the new types).

If you can change the code of the TableSource you can return in the
TableSource#getProducedType the tableSchema.toRowDataType, where the
tableSchema is the schema coming from catalog. Or you can make sure that
the catalog table produces the legacy type:

TableSchema.field("field", Types.OBJECT_ARRAY(Types.STRING));

In 1.11 we will introduce new sources and formats already working
entirely with the new type system (AvroRowDataDeserializationSchema and
KafkaDynamicTable).

Hope this helps a bit.

Best,

Dawid

On 04/06/2020 13:43, Ramana Uppala wrote:
> Hi,
> Avro schema contains Array type and we created TableSchema out of the 
> AvroSchema and created a table in catalog. In the catalog, this specific 
> filed type shown as ARRAY. We are using 
> AvroRowDeserializationSchema with the connector and returnType of TableSource 
> showing Array mapped to LEGACY('ARRAY', 'ANY<[Ljava.lang.String;, by 
> AvroSchemaConverter
>
> when we are running the application, planner validating physical types and 
> logical types and we are getting below error.
>
> of table field 'XYZ' does not match with the physical type ROW<
>
> Any suggestions on how to resolve this ? is this a bug ?



signature.asc
Description: OpenPGP digital signature


flink 1.10SQL 报错问题求教

2020-06-05 Thread hb
Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB),
但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环.
哪位帮忙看看,不胜感激.


2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task - 
Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> 
SourceConversion(table=[default_catalog.default_database.user_visit_trace, 
source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], 
fields=[userId, utp, utrp, extendFields, requestTime]) -> 
Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], 
where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'MMdd') > 
_UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS NOT 
NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], 
joinType=[InnerJoin], async=[false], lookup=[user_id=userId], where=[shop_id IS 
NOT NULL], select=[item_id, userId, requestTime, shop_id, user_id]) -> 
Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / 1000) 
FROM_UNIXTIME _UTF-16LE'MMdd') CONCAT 
_UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT _UTF-16LE':' 
CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> 
SinkConversionToTuple2 -> Sink: Unnamed (1/8) 
(ad7a2d51beccbb39f83ac2c16b923bd0) switched from RUNNING to FAILED.
java.lang.Exception: Could not perform checkpoint 401 for operator Source: 
KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> 
SourceConversion(table=[default_catalog.default_database.user_visit_trace, 
source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], 
fields=[userId, utp, utrp, extendFields, requestTime]) -> 
Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], 
where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND 
(((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'MMdd') > 
_UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS NOT 
NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], 
joinType=[InnerJoin], async=[false], lookup=[user_id=userId], where=[shop_id IS 
NOT NULL], select=[item_id, userId, requestTime, shop_id, user_id]) -> 
Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / 1000) 
FROM_UNIXTIME _UTF-16LE'MMdd') CONCAT 
_UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT _UTF-16LE':' 
CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> 
SinkConversionToTuple2 -> Sink: Unnamed (1/8).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$595/1311782208.call(Unknown
 Source)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$597/1708409807.run(Unknown
 Source)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
... 12 more
2020-06-05 14:13:19,795 INFO org.apache.flink.runtime.taskmanager.Task - 
Attempting to fail task externally Source: KafkaTableSource(userId, utp, utrp, 
extendFields, requestTime) -> 
SourceConversion(table=[default_catalog.default_database.user_visit_trace, 
source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], 
fields=[userId, utp, utrp, extendFields, requestTime]) -> 

Re: flink sql ddl能设置ttl吗?

2020-06-05 Thread Leonard Xu
更新下,第二个问题,Flink sql-client中支持SET配置 TTL(ms)的:

Flink SQL> set execution.min-idle-state-retention = 2;
[INFO] Session property has been set.

祝好,
Leonard Xu

> 在 2020年6月5日,13:39,Leonard Xu  写道:
> 
> Hi,
> 
> 第一个问题,即将发布的1.11可以设定primary key, 不用再推断primary 
> key,除pk外的字段也没有限制,当前你可以加firt_value,last_value等函数取非group by的其他字段, 
> jdbc的文档在撰写中了[1]
> 第二个问题,目前应该还不支持通过SET在DDL里设置ttl的.
> 
> Best,
> Leonard Xu
> [1] https://issues.apache.org/jira/browse/FLINK-17829 
> 
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html
>  
> 
> 
> 
> 
>> 在 2020年6月4日,09:11,xu yihan mailto:mytar...@126.com>> 写道:
>> 
>> 各位好,
>> 
>> 我有一个需求是要维表关联mysql数据,再upsert写入mysql,就是个丰富字段的简单需求。
>> 
>> 现在有两个问题:
>> 1.为了实现upsert插入,我试下来必须使用groupby指定key,但是select内又有很多非聚合项,所以没办法在groupby后面加上所有那些非聚合项,否则通不过calcite
>>  validation。
>> 
>> 2.现在遇到了一个问题,跑一段时间会抱full GC,我估计是因为groupby导致的状态不清理。
>> 请问flink sql ddl能通过类似SET 语句来设定ttl吗,搜了官方文档只有提到table 
>> api里有setIdelStateRententionTime。
>> 
>> 感谢各位的指教。
>> 
> 



Flink-1.10.0-StandAlone模式 Kafka Consumer InstanceAlreadyExistsException

2020-06-05 Thread zhang...@lakala.com
Flink-1.10.0 StandAlone部署
Kafka-0.11
JDK8

当我在同一个Flink集群部署两个应用,这两个应用被分配到了同一个TaskManager运行,使用不同的group.id消费同一个topic时,第一个应用启动正常,第二个应用启动时,遇到了如下警告信息,请问,这个警告是如何造成的,是否可以忽略,或者说如何解决。

WARNorg.apache.kafka.common.utils.AppInfoParser- Error registering 
AppInfo mbean
javax.management.InstanceAlreadyExistsException: 
kafka.consumer:type=app-info,id=consumer-5
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:757)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:633)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:615)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:502)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:181)




flink sql 中值为null时结果都为 false

2020-06-05 Thread whirly
大家好:
在 flink sql 中,如 SELECT * from order where  product <> 'rubber',如果数据中的 
product 字段值是 null, 是无法匹配 product <> 'rubber' 这个条件的,虽然 null 确实不等于 'rubber'
只有将条件改为  where product is Null or product <> 'rubber' 才能匹配。
但是我想要 null <> 'rubber' 的结果也为 True,且不想在条件之前加上 product is Null 的判断,可以怎么办呢?


感谢