有没有办法把flink Row类转换为Object

2020-11-10 Thread ZT.Ren
基于flink做二次开发中,需要将flink SQL执行结果打印到会话中,会话中执行结果打印流程固定打印List类型数据。
大部分查询引擎(比如presto)都会在ResultSet中提供getObject方法,flink中如何实现?

Re: Checkpoint growth

2020-11-10 Thread Akshay Aggarwal
Hi Rex,

As per my understanding there are multiple levels of compactions (with
RocksDB), and files which are not compacted recently would remain in older
checkpoint directories, and there will be references to those files in the
current checkpoint. There is no clear way of identifying these references
and clearing older checkpoint directories.

What we do instead to avoid ever increasing checkpoint directory size is to
stop the job with a savepoint, clear the checkpoints directory and start
the job from the savepoint periodically.

Thanks,
Akshay Aggarwal

On Tue, Nov 10, 2020 at 10:55 PM Rex Fenley  wrote:

> Hello,
>
> I'm reading the docs/blog on incremental checkpoints and it says:
>
> >You can also no longer delete old checkpoints as newer checkpoints need
> them, and the history of differences between checkpoints can grow
> indefinitely over time. You need to plan for larger distributed storage to
> maintain the checkpoints and the network overhead to read from it.
> source:
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
>
> I'm wondering why this would be true though. It says earlier that
> incremental checkpoints compact so why would the history grow indefinitely?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>

-- 


*-*

*This email and any files transmitted with it are confidential and 
intended solely for the use of the individual or entity to whom they are 
addressed. If you have received this email in error, please notify the 
system manager. This message contains confidential information and is 
intended only for the individual named. If you are not the named addressee, 
you should not disseminate, distribute or copy this email. Please notify 
the sender immediately by email if you have received this email by mistake 
and delete this email from your system. If you are not the intended 
recipient, you are notified that disclosing, copying, distributing or 
taking any action in reliance on the contents of this information is 
strictly prohibited.*

 

*Any views or opinions presented in this 
email are solely those of the author and do not necessarily represent those 
of the organization. Any information on shares, debentures or similar 
instruments, recommended product pricing, valuations and the like are for 
information purposes only. It is not meant to be an instruction or 
recommendation, as the case may be, to buy or to sell securities, products, 
services nor an offer to buy or sell securities, products or services 
unless specifically stated to be so on behalf of the Flipkart group. 
Employees of the Flipkart group of companies are expressly required not to 
make defamatory statements and not to infringe or authorise any 
infringement of copyright or any other legal right by email communications. 
Any such communication is contrary to organizational policy and outside the 
scope of the employment of the individual concerned. The organization will 
not accept any liability in respect of such communication, and the employee 
responsible will be personally liable for any damages or other liability 
arising.*

 

*Our organization accepts no liability for the 
content of this email, or for the consequences of any actions taken on the 
basis of the information *provided,* unless that information is 
subsequently confirmed in writing. If you are not the intended recipient, 
you are notified that disclosing, copying, distributing or taking any 
action in reliance on the contents of this information is strictly 
prohibited.*

_-_


Re: 背压问题排查疑问求解

2020-11-10 Thread 赵一旦
但是abc背压,def正常。说明性能压力在def呀。

restart  于2020年11月11日周三 下午2:09写道:

>
> 这个我在d,e,f中都看过,背压正常,metrics(outPoolUsage、inPoolUsage、floatingBuffersUsage、exclusiveBuffersUsage)数据都是0,如果是你说的这种的话,c的outPoolUsage就应该是1,这样才能解释的通,但是现象是c的metrics正常,但是背压是存在的
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


slot数量与并行度的大小关系

2020-11-10 Thread hl9...@126.com
Hi,all:
我在flink web面板上提交了1个job,job的并行度为15,flink集群slot总数为12,发现任务一直在created阶段等待,一段时间后报错:
Caused by: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate the required slot within slot request timeout. 
Please make sure that the cluster has enough resources.

是因为slot数量必须要大于并行度吗?有没有参数可以让flink自动选择可用slot数作为job的并行度?



hl9...@126.com


Checkpoint growth

2020-11-10 Thread Rex Fenley
Hello,

I'm reading the docs/blog on incremental checkpoints and it says:

>You can also no longer delete old checkpoints as newer checkpoints need
them, and the history of differences between checkpoints can grow
indefinitely over time. You need to plan for larger distributed storage to
maintain the checkpoints and the network overhead to read from it.
source:
https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html

I'm wondering why this would be true though. It says earlier that
incremental checkpoints compact so why would the history grow indefinitely?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



CREATE TABLE LIKE clause from different catalog or database

2020-11-10 Thread Dongwon Kim
Hi,

Is it disallowed to refer to a table from different databases or catalogs
when someone creates a table?

According to [1], there's no way to refer to tables belonging to different
databases or catalogs.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table

Best,

Dongwon


Re:请问有哪些功能,是 stream api 可以做到,而 table api 无法做到的呢?

2020-11-10 Thread hailongwang
Hi LittleFall,
   这个可能不好对比,最好结合你的需求。
   你可以理解为 Table API 为 我们内置了很多标准的算子,比如说 join,unionall 等,简化了我们自己实现的成本。
   而 DataStream API 比较灵活,但处理逻辑都需要自己定义。
   如果你的需求需要用到 state 或者 timer 的能力的,那么就需要使用 DataStream API。
希望这些对比有帮助。


Best ,
hailong
在 2020-11-10 13:42:39,"LittleFall" <1578166...@qq.com> 写道:
>如题,望各位老师指点。
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Upsert UDFs

2020-11-10 Thread Rex Fenley
Thanks! We did give that a shot and ran into the bug that I reported here
https://issues.apache.org/jira/browse/FLINK-20036 .

I'm also seeing this function

  public void emitUpdateWithRetract(ACC accumulator,
RetractableCollector out); // OPTIONAL

and it says it's more performant in some cases vs

  public void emitValue(ACC accumulator, Collector out); // OPTIONAL

. I'm having some trouble understanding in which cases it benefits
performance and if it would help our case. Would using
`emitUpdateWithRetract` instead of `emitValue` reduce the number of
retracts we're seeing yet preserve the same end results, where our
Elasticsearch documents stay up to date?

On Sun, Nov 8, 2020 at 6:43 PM Jark Wu  wrote:

> Hi Rex,
>
> There is a similar question asked recently which I think is the same
> reason [1] called retraction amplification.
> You can try to turn on the mini-batch optimization to reduce the
> retraction amplification.
>
> Best,
> Jark
>
> [1]:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/A-question-about-flink-sql-retreact-stream-td39216.html
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>
> On Fri, 6 Nov 2020 at 03:56, Rex Fenley  wrote:
>
>> Also, just to be clear our ES connector looks like this:
>>
>> CREATE TABLE sink_es_groups (
>> id BIGINT,
>> //.. a bunch of scalar fields
>> array_of_ids ARRAY,
>> PRIMARY KEY (id) NOT ENFORCED
>> ) WITH (
>> 'connector' = 'elasticsearch-7',
>> 'hosts' = '${env:ELASTICSEARCH_HOSTS}',
>> 'index' = '${env:GROUPS_ES_INDEX}',
>> 'format' = 'json',
>> 'sink.bulk-flush.max-actions' = '512',
>> 'sink.bulk-flush.max-size' = '1mb',
>> 'sink.bulk-flush.interval' = '5000',
>> 'sink.bulk-flush.backoff.delay' = '1000',
>> 'sink.bulk-flush.backoff.max-retries' = '4',
>> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
>> )
>>
>>
>> On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley  wrote:
>>
>>> Hello,
>>>
>>> I'm using the Table API to do a bunch of stateful transformations on CDC
>>> Debezium rows and then insert final documents into Elasticsearch via the ES
>>> connector.
>>>
>>> I've noticed that Elasticsearch is constantly deleting and then
>>> inserting documents as they update. Ideally, there would be no delete
>>> operation for a row update, only for a delete. I'm using the Elasticsearch
>>> 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink`
>>> under the hood, which implies upserts are actually what it's capable of.
>>>
>>> Therefore, I think it's possibly my table plan that's causing row
>>> upserts to turn into deletes + inserts. My plan is essentially a series of
>>> Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I think,
>>> possibly the UDF Aggs following the Joins + GroupBys are causing the
>>> upserts to split into delete + inserts somehow. If this is correct, is it
>>> possible to make UDFs that preserve Upserts? Or am I totally off-base with
>>> my assumptions?
>>>
>>> Thanks!
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com  |  BLOG 
>>>  |  FOLLOW US   |  LIKE US
>>> 
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Flink cdc mysql 字段是datetime类型时0000-00-00 00:00:00会被flink转成1970-01-01T00:00

2020-11-10 Thread 丁浩浩
当我mysql字段时datetime并且字段值是-00-00 
00:00:00时,会被转成1970-01-01T00:00,如果我应该如何操作才能保证跟原数值保持一致?
输出的结果:
2> (true,1,zhangsan,18,1970-01-01T00:00)
3> (true,2,lisi,20,2020-11-11T14:17:46)
4> (true,3,wangwu,99,1970-01-01T00:00)
1> (true,4,zhaoliu,77,1970-01-01T00:00)
日志信息:
2020-11-11 14:30:37,418 - 19755 WARN  
[debezium-mysqlconnector-mysql_binlog_source-snapshot] 
io.debezium.connector.mysql.MySqlValueConverters:852  - Invalid value 
'-00-00 00:00:00' stored in column 'rigist_time' of table 'test.boos' 
converted to empty value
2020-11-11 14:30:37,424 - 19761 WARN  
[debezium-mysqlconnector-mysql_binlog_source-snapshot] 
io.debezium.connector.mysql.MySqlValueConverters:852  - Invalid value 
'-00-00 00:00:00' stored in column 'rigist_time' of table 'test.boos' 
converted to empty value
2020-11-11 14:30:37,424 - 19761 WARN  
[debezium-mysqlconnector-mysql_binlog_source-snapshot] 
io.debezium.connector.mysql.MySqlValueConverters:852  - Invalid value 
'-00-00 00:00:00' stored in column 'rigist_time' of table 'test.boos' 
converted to empty value




flink 1.11.1 使用sql ,使用hbase作为维表进行temporal join时无法获取数据

2020-11-10 Thread 鱼子酱
Hi,社区的各位大家好:
我在使用Hbase作为维表,进行temporal join时无法获取数据,
具体表现就是任务可以运行,但是没有符合条件的数据输出。
flink版本是1.11.1,Hbase版本是1.2.0-cdh5.15.1

1、我测试了,如果去掉关联条件只读取kafka源:
insert into testhbase
select cast(applog.terminalNo as varchar)
from applog
,是能够输出数据,

但是一旦加上关联条件,就无法输出了。

2、请问目前hbase支持使用sql进行scan或者查询吗?我目前想要尝试只把Hbase的数据scan出来检查一下,但是使用下面这个sql的话,任务会自动停止,
  select.sql1: >
insert into testhbase
select rowkey
from  shop
报错如下:
14:25:51,726 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils  - The
configuration option taskmanager.cpu.cores required for local execution is
not set, setting it to the maximal possible value.
14:25:51,727 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils  - The
configuration option taskmanager.memory.task.heap.size required for local
execution is not set, setting it to the maximal possible value.
14:25:51,727 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils  - The
configuration option taskmanager.memory.task.off-heap.size required for
local execution is not set, setting it to the maximal possible value.
14:25:51,728 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils  - The
configuration option taskmanager.memory.network.min required for local
execution is not set, setting it to its default value 64 mb.
14:25:51,728 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils  - The
configuration option taskmanager.memory.network.max required for local
execution is not set, setting it to its default value 64 mb.
14:25:51,728 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils  - The
configuration option taskmanager.memory.managed.size required for local
execution is not set, setting it to its default value 128 mb.
14:25:51,733 INFO  org.apache.flink.runtime.minicluster.MiniCluster 
- Starting Flink Mini Cluster
14:25:51,734 INFO  org.apache.flink.runtime.minicluster.MiniCluster 
- Starting Metrics Registry
14:25:51,764 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl  
- No metrics reporter configured, no metrics will be exposed/reported.
14:25:51,764 INFO  org.apache.flink.runtime.minicluster.MiniCluster 
- Starting RPC Service(s)
14:25:51,772 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
- Trying to start local actor system
14:25:51,998 INFO  akka.event.slf4j.Slf4jLogger 
- Slf4jLogger started
14:25:52,155 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
- Actor system started at akka://flink
14:25:52,163 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
- Trying to start local actor system
14:25:52,174 INFO  akka.event.slf4j.Slf4jLogger 
- Slf4jLogger started
14:25:52,196 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
- Actor system started at akka://flink-metrics
14:25:52,205 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService 
- Starting RPC endpoint for
org.apache.flink.runtime.metrics.dump.MetricQueryService at
akka://flink-metrics/user/rpc/MetricQueryService .
14:25:52,248 INFO  org.apache.flink.runtime.minicluster.MiniCluster 
- Starting high-availability services
14:25:52,339 INFO  org.apache.flink.runtime.blob.BlobServer 
- Created BLOB server storage directory
C:\Users\xufuquan\AppData\Local\Temp\blobStore-15963e40-62fd-41f6-b79e-34478d9076f5
14:25:52,344 INFO  org.apache.flink.runtime.blob.BlobServer 
- Started BLOB server at 0.0.0.0:51813 - max concurrent requests: 50 - max
backlog: 1000
14:25:52,352 INFO  org.apache.flink.runtime.blob.PermanentBlobCache 
- Created BLOB cache storage directory
C:\Users\xufuquan\AppData\Local\Temp\blobStore-b10a2d3a-c85d-4bc6-bb87-8ad33d264303
14:25:52,354 INFO  org.apache.flink.runtime.blob.TransientBlobCache 
- Created BLOB cache storage directory
C:\Users\xufuquan\AppData\Local\Temp\blobStore-61dffed1-c680-4279-8722-4d790dbfee34
14:25:52,354 INFO  org.apache.flink.runtime.minicluster.MiniCluster 
- Starting 1 TaskManger(s)
14:25:52,356 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner  
- Starting TaskManager with ResourceID: d0b1f67e-e303-4363-ad1c-24bd80702892
14:25:52,369 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices
- Temporary file directory 'C:\Users\xufuquan\AppData\Local\Temp': total 79
GB, usable 6 GB (7.59% usable)
14:25:52,371 INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  
- FileChannelManager uses directory
C:\Users\xufuquan\AppData\Local\Temp\flink-io-b354fbd8-8742-4f42-a615-645acc877dd1
for spill files.
14:25:52,376 INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  
- FileChannelManager uses directory
C:\Users\xufuquan\AppData\Local\Temp\flink-netty-shuffle-9bb16fd0-4b97-43ce-b055-5dac35a91e71
for spill files.
14:25:52,404 INFO 

Re: 回复:flink1.11 读取kafka avro格式数据发序列化失败

2020-11-10 Thread 奔跑的小飞袁
这是我尝试输出的message长度
message length is: 529
message length is: 212
message length is: 391




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 背压问题排查疑问求解

2020-11-10 Thread restart
这个我在d,e,f中都看过,背压正常,metrics(outPoolUsage、inPoolUsage、floatingBuffersUsage、exclusiveBuffersUsage)数据都是0,如果是你说的这种的话,c的outPoolUsage就应该是1,这样才能解释的通,但是现象是c的metrics正常,但是背压是存在的



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Join Bottleneck

2020-11-10 Thread Rex Fenley
Thank you for the clarification.

On Sat, Nov 7, 2020 at 7:37 AM Till Rohrmann  wrote:

> Hi Rex,
>
> "HasUniqueKey" means that the left input has a unique key.
> "JoinKeyContainsUniqueKey" means that the join key of the right side
> contains the unique key of this relation. Hence, it looks normal to me.
>
> Cheers,
> Till
>
> On Fri, Nov 6, 2020 at 7:29 PM Rex Fenley  wrote:
>
>> Hello,
>>
>> I have a Job that's a series of Joins, GroupBys, and Aggs and it's
>> bottlenecked in one of the joins. The join's cardinality is ~300 million
>> rows on the left and ~200 million rows on the right all with unique keys.
>> I'm seeing this in the plan for that bottlenecked Join.
>>
>> Join(joinType=[InnerJoin], where=[(user_id = id0)], select=[id, group_id,
>> user_id, uuid, owner, id0, deleted_at], leftInputSpec=[HasUniqueKey],
>> rightInputSpec=[JoinKeyContainsUniqueKey])
>>
>> The join condition is basically (left.user_id === right.id). So `id0`
>> must be right.id here.
>>
>> My first question is, what is the difference between
>>
>> leftInputSpec=[HasUniqueKey]
>>
>> and
>>
>> rightInputSpec=[JoinKeyContainsUniqueKey]
>>
>>  ?
>>
>> Is the left side not using the join key for hashing the join but instead
>> using its pk id, which would be underperformant?
>>
>> Is there anything else about this that stands out?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Rules of Thumb for Setting Parallelism

2020-11-10 Thread Rex Fenley
Awesome, thanks!

On Sat, Nov 7, 2020 at 6:43 AM Till Rohrmann  wrote:

> Hi Rex,
>
> You should configure the number of slots per TaskManager to be the number
> of cores of a machine/node. In total you will then have a cluster with
> #slots = #cores per machine x #machines.
>
> If you have a cluster with 4 nodes and 8 slots each, then you have a total
> of 32 slots. Now if you have a job A which you start with a parallelism of
> 20, then you have 12 slots left. Hence, you could make use of these 12
> slots by starting a job B with a parallelism 12.
>
> Cheers,
> Till
>
> On Fri, Nov 6, 2020 at 7:20 PM Rex Fenley  wrote:
>
>> Great, thanks!
>>
>> So just to confirm, configure # of task slots to # of core nodes x # of
>> vCPUs?
>>
>> I'm not sure what you mean by "distribute them across both jobs (so that
>> the total adds up to 32)". Is it configurable how many task slots a job can
>> receive, so in this case I'd provide ~30/36 * 32 task slots for one job and
>> ~6/36 * 32 for another job, but even them out to sum to 32 task slots?
>>
>> Thanks
>>
>> On Fri, Nov 6, 2020 at 10:01 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Rex,
>>>
>>> as a rule of thumb I recommend configuring your TMs with as many slots
>>> as they have cores. So in your case your cluster would have 32 slots. Then
>>> depending on the workload of your jobs you should distribute them across
>>> both jobs (so that the total adds up to 32). A high number of operators
>>> does not necessarily mean that it needs more slots since operators can
>>> share the same slot. It mostly depends on the workload of your job. If the
>>> job should be too slow, then you would have to increase the cluster
>>> resources.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Nov 6, 2020 at 12:21 AM Rex Fenley  wrote:
>>>
 Hello,

 I'm running a Job on AWS EMR with the TableAPI that does a long series
 of Joins, GroupBys, and Aggregates and I'd like to know how to best tune
 parallelism.

 In my case, I have 8 EMR core nodes setup each with 4vCores and 8Gib of
 memory. There's a job we have to run that has ~30 table operators. Given
 this, how should I calculate what to set the systems parallelism to?

 I also plan on running a second job on the same system, but just with 6
 operators. Will this change the calculation for parallelism at all?

 Thanks!

 --

 Rex Fenley  |  Software Engineer - Mobile and Backend


 Remind.com  |  BLOG 
  |  FOLLOW US   |  LIKE US
 

>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-10 Thread Lei Wang
有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。

比如
robot1   2020-11-11 12:00:00 msginfo
之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00 就发出报警呢?

flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?

我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。

这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
我们必须 按 robotId 做 keyBy

求大神指教。

谢谢,
王磊


Re: debug statefun

2020-11-10 Thread Tzu-Li (Gordon) Tai
On Wed, Nov 11, 2020 at 1:44 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi Lian,
>
> Sorry, I didn't realize that the issue you were bumping into was caused by
> the module not being discovered.
> You're right, the harness utility would not help here.
>

Actually, scratch this comment. The Harness utility actually would help
here with surfacing these module discovery issues / missing META-INF files
in embedded module jars.
When using the Harness, module discovery works exactly the same as normal
application submissions, loaded via the Java SPI.

So, in general, the harness utility can be used to check:

   - Your application logic, messaging between functions, mock ingress
   inputs, etc.
   - Missing constructs in your application modules (e.g. missing ingress /
   egresses, routers)
   - Incorrect module packaging (e.g. missing module.yaml for remote
   modules, or missing META-INF metadata files for embedded modules)

Best,
Gordon

>


Re: debug statefun

2020-11-10 Thread Lian Jiang
Thanks Gordon. After better understanding how autoservice work, I resolved
the issue by adding below into my build.gradle file:

annotationProcessor 'com.google.auto.service:auto-service:1.0-rc6'

Without this, the project can compile but the autoservice class cannot
be generated appropriately.

Sorry I am not clear that I am using gradle. Cheers!


On Tue, Nov 10, 2020 at 9:44 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi Lian,
>
> Sorry, I didn't realize that the issue you were bumping into was caused by
> the module not being discovered.
> You're right, the harness utility would not help here.
>
> As for the module discovery problem:
>
>- Have you looked at the contents of your jar, and see that a
>META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule
>has indeed been generated by AutoService?
>- Just to rule out the obvious first: besides the
>auto-service-annotations dependency, you also have to add the auto-service
>compiler plugin, as demonstrated here:
>https://github.com/apache/flink-statefun/blob/master/pom.xml#L192
>
> Only after adding the build plugin mentioned above, the META-INF metadata
> will be generated for classes annotated with @AutoService.
>
> Please let us know if this resolves the issue for you.
>
> Cheers,
> Gordon
>
> On Wed, Nov 11, 2020 at 3:15 AM Lian Jiang  wrote:
>
>> Igal,
>>
>> I am using AutoService and I don't need to add auto-service-annotations
>> since it is provided by statefun-flink-core. Otherwise, my project cannot
>> even build. I did exactly the same as
>>
>>
>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-greeter-example/src/main/java/org/apache/flink/statefun/examples/greeter/GreetingModule.java
>>
>> I did below test:
>> In statefun-greeter-example project, replace greeter jar with my jar in
>> Dockerfile, running this project can NOT find my module.
>>
>> In my project, replace my jar with the greeter jar in Dockerfile, running
>> this project can find the greeter module.
>>
>> So I am really puzzled about what is wrong with my jar.
>>
>>
>>
>> Gorden,
>>
>> harness test plumbing of ingress/egress. But it may not help me debug why
>> Flink cannot discover my module. Correct?
>>
>> Thanks guys.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Nov 10, 2020 at 9:11 AM Igal Shilman  wrote:
>>
>>> Hi Lian,
>>>
>>> If you are using the statefun-sdk directly (an embedded mode) then, most
>>> likely is that you are missing a
>>> META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule
>>> file that would point to your modules class. We are using Java SPI [1]
>>> to load all the stateful functions modules at runtime.
>>> Alternatively, you can use the @AutoService annotation [2] (you will
>>> need to add a maven dependency for that [3])
>>>
>>> If you are using the remote functions deployment mode, then please make
>>> sure that your module.yaml file is present in your Dockerfile. (for example
>>> [4])
>>>
>>> Good luck,
>>> Igal.
>>>
>>> [1] https://docs.oracle.com/javase/tutorial/ext/basics/spi.html
>>> [2]
>>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-greeter-example/src/main/java/org/apache/flink/statefun/examples/greeter/GreetingModule.java#L30
>>> [3] https://github.com/apache/flink-statefun/blob/master/pom.xml#L85,L89
>>> [4]
>>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-python-greeter-example/Dockerfile#L20
>>>
>>> On Tue, Nov 10, 2020 at 4:47 PM Tzu-Li (Gordon) Tai 
>>> wrote:
>>>
 Hi,

 StateFun provide's a Harness utility exactly for that, allowing you to
 test a StateFun application in the IDE / setting breakpoints etc.
 You can take a look at this example on how to use the harness:
 https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
 .

 Cheers,
 Gordon

 On Tue, Nov 10, 2020 at 5:04 AM Lian Jiang 
 wrote:

>
> Hi,
>
> I created a POC by mimicing statefun-greeter-example. However, it
> failed due to:
>
> Caused by: java.lang.IllegalStateException: There are no ingress
> defined.
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
> ~[statefun-flink-core.jar:2.2.0]
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:71)
> ~[statefun-flink-core.jar:2.2.0]
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:47)
> ~[statefun-flink-core.jar:2.2.0]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_265]
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_265]
> at
> 

Re: debug statefun

2020-11-10 Thread Tzu-Li (Gordon) Tai
Hi Lian,

Sorry, I didn't realize that the issue you were bumping into was caused by
the module not being discovered.
You're right, the harness utility would not help here.

As for the module discovery problem:

   - Have you looked at the contents of your jar, and see that a
   META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule
   has indeed been generated by AutoService?
   - Just to rule out the obvious first: besides the
   auto-service-annotations dependency, you also have to add the auto-service
   compiler plugin, as demonstrated here:
   https://github.com/apache/flink-statefun/blob/master/pom.xml#L192

Only after adding the build plugin mentioned above, the META-INF metadata
will be generated for classes annotated with @AutoService.

Please let us know if this resolves the issue for you.

Cheers,
Gordon

On Wed, Nov 11, 2020 at 3:15 AM Lian Jiang  wrote:

> Igal,
>
> I am using AutoService and I don't need to add auto-service-annotations
> since it is provided by statefun-flink-core. Otherwise, my project cannot
> even build. I did exactly the same as
>
>
> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-greeter-example/src/main/java/org/apache/flink/statefun/examples/greeter/GreetingModule.java
>
> I did below test:
> In statefun-greeter-example project, replace greeter jar with my jar in
> Dockerfile, running this project can NOT find my module.
>
> In my project, replace my jar with the greeter jar in Dockerfile, running
> this project can find the greeter module.
>
> So I am really puzzled about what is wrong with my jar.
>
>
>
> Gorden,
>
> harness test plumbing of ingress/egress. But it may not help me debug why
> Flink cannot discover my module. Correct?
>
> Thanks guys.
>
>
>
>
>
>
>
>
>
> On Tue, Nov 10, 2020 at 9:11 AM Igal Shilman  wrote:
>
>> Hi Lian,
>>
>> If you are using the statefun-sdk directly (an embedded mode) then, most
>> likely is that you are missing a
>> META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule
>> file that would point to your modules class. We are using Java SPI [1] to
>> load all the stateful functions modules at runtime.
>> Alternatively, you can use the @AutoService annotation [2] (you will need
>> to add a maven dependency for that [3])
>>
>> If you are using the remote functions deployment mode, then please make
>> sure that your module.yaml file is present in your Dockerfile. (for example
>> [4])
>>
>> Good luck,
>> Igal.
>>
>> [1] https://docs.oracle.com/javase/tutorial/ext/basics/spi.html
>> [2]
>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-greeter-example/src/main/java/org/apache/flink/statefun/examples/greeter/GreetingModule.java#L30
>> [3] https://github.com/apache/flink-statefun/blob/master/pom.xml#L85,L89
>> [4]
>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-python-greeter-example/Dockerfile#L20
>>
>> On Tue, Nov 10, 2020 at 4:47 PM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi,
>>>
>>> StateFun provide's a Harness utility exactly for that, allowing you to
>>> test a StateFun application in the IDE / setting breakpoints etc.
>>> You can take a look at this example on how to use the harness:
>>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>>> .
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On Tue, Nov 10, 2020 at 5:04 AM Lian Jiang 
>>> wrote:
>>>

 Hi,

 I created a POC by mimicing statefun-greeter-example. However, it
 failed due to:

 Caused by: java.lang.IllegalStateException: There are no ingress
 defined.
 at
 org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
 ~[statefun-flink-core.jar:2.2.0]
 at
 org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:71)
 ~[statefun-flink-core.jar:2.2.0]
 at
 org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:47)
 ~[statefun-flink-core.jar:2.2.0]
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 ~[?:1.8.0_265]
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 ~[?:1.8.0_265]
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_265]
 at java.lang.reflect.Method.invoke(Method.java:498)
 ~[?:1.8.0_265]
 at
 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
 at
 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
 at
 

Re: SSL setup for YARN deployment when hostnames are unknown.

2020-11-10 Thread Jiahui Jiang
Ping on this   It there anyway I can run a script or implement some interface 
to run before the Dispatcher service starts up to dynamically generate the 
keystore?

Thank you!

From: Jiahui Jiang 
Sent: Monday, November 9, 2020 3:19 PM
To: user@flink.apache.org 
Subject: SSL setup for YARN deployment when hostnames are unknown.

Hello Flink!

We are working on turning on REST SSL for YARN deployments. We built a generic 
orchestration server that can submit Flink clusters to any YARN clusters given 
the relevant Hadoop configs. But this means we may not know the hostname the 
Job Managers can be deployed onto - not even through wild card DNS 
names
 as recommended in the documentation.

I’m wondering is there any factory class that I can implement that can allow me 
to generate a private key and import that to JM’s keystore at runtime?
Or is there any other recommended way to handle the cases where we don’t know 
the potential JM hosts at all?

Thank you!



Re: 背压问题排查疑问求解

2020-11-10 Thread 赵一旦
按照你描述,出问题的是def中任意1个或多个。不会是c的问题哈。

restart  于2020年11月11日周三 下午12:26写道:

> flink项目的DAG如图:
> <
> http://apache-flink.147419.n8.nabble.com/file/t1014/Dingtalk_2020100815.jpg
> >
> ,job任务每晚高峰期就会出现背压问题,checkpoint超时。图中,a,b,c全部出现背压,d,e,f正常,根据 背压排查思路
> <
> http://www.whitewood.me/2019/11/03/Flink-%E5%8F%8D%E5%8E%8B%E5%88%86%E6%9E%90%E5%8F%8A%E5%A4%84%E7%90%86/>
>
> ,应该c是根源,奇怪的是查看c的metrics(outPoolUsage、inPoolUsage
> 、floatingBuffersUsage、exclusiveBuffersUsage
>
> ),数据显示都是0,是不是可以理解c自身处理能力导致了背压了呢,至于gc这块,差不多跑一天左右,gc次数1400左右,还有逻辑场景里,基本是keyby->window->reduce这种,中间涉及到了Set集合存储用户id,Map存储耗时和次数(主要用来计算99line,95line,将耗时时间作为key,同时间的记录数做value,减少集合大小),还有数据倾斜这块,a的并行度给kafka的partation一致,而且a,b,c,d,e,f的subtasks基本均衡,整个运行期间也没出现异常信息,像这种现象,该如何定位到问题点呢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


背压问题排查疑问求解

2020-11-10 Thread restart
flink项目的DAG如图:

,job任务每晚高峰期就会出现背压问题,checkpoint超时。图中,a,b,c全部出现背压,d,e,f正常,根据 背压排查思路

 
,应该c是根源,奇怪的是查看c的metrics(outPoolUsage、inPoolUsage
、floatingBuffersUsage、exclusiveBuffersUsage
),数据显示都是0,是不是可以理解c自身处理能力导致了背压了呢,至于gc这块,差不多跑一天左右,gc次数1400左右,还有逻辑场景里,基本是keyby->window->reduce这种,中间涉及到了Set集合存储用户id,Map存储耗时和次数(主要用来计算99line,95line,将耗时时间作为key,同时间的记录数做value,减少集合大小),还有数据倾斜这块,a的并行度给kafka的partation一致,而且a,b,c,d,e,f的subtasks基本均衡,整个运行期间也没出现异常信息,像这种现象,该如何定位到问题点呢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复:flink1.11 读取kafka avro格式数据发序列化失败

2020-11-10 Thread Shuai Xia
Hi,可以试下输出下message的长度么?


--
发件人:奔跑的小飞袁 
发送时间:2020年11月11日(星期三) 11:40
收件人:user-zh 
主 题:flink1.11 读取kafka avro格式数据发序列化失败

hello  我在使用flink1.11版本读取kafka
avro格式数据时遇到了错误,由于我们的avro特殊,因此源码稍微作了修改,以下是改动的代码片段
@Override
 public T deserialize(byte[] message) throws IOException {
  // read record
  checkAvroInitialized();
  inputStream.setBuffer(message);
  inputStream.skip(5);
  Schema readerSchema = getReaderSchema();
  GenericDatumReader datumReader = getDatumReader();
  datumReader.setSchema(readerSchema);
  return datumReader.read(null, decoder);
 }
源码包为:org.apache.flink.formats.avro.AvroDeserializationSchema

相同的改动在1.9.0是可以正常工作,我想知道在读取avro格式的数据这块社区是有过什么改动吗

以下是错误信息
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: org.apache.flink.client.program.ProgramInvocationException:
Job failed (JobID: f4d2bd903a55e2d10d67d69eadba618a)
 at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
 at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
 at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
 at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
 at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
 at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: f4d2bd903a55e2d10d67d69eadba618a)
 at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
 at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
 at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
 at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
 at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1681)
 at
com.intsig.flink.streaming.streaming_project.abtest.GenericRecordTest.main(GenericRecordTest.java:54)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
 ... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: f4d2bd903a55e2d10d67d69eadba618a)
 at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
 at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
 at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
 at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
 at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
 at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
 at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
 at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at

Re: Re:Re: Re:flinksql 输出到kafka用的fixed的方式 结果都输出到一个topic分区了,这个是什么原因

2020-11-10 Thread leiyanrui
好的 谢谢哈 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.11 读取kafka avro格式数据发序列化失败

2020-11-10 Thread 奔跑的小飞袁
hello  我在使用flink1.11版本读取kafka
avro格式数据时遇到了错误,由于我们的avro特殊,因此源码稍微作了修改,以下是改动的代码片段
@Override
public T deserialize(byte[] message) throws IOException {
// read record
checkAvroInitialized();
inputStream.setBuffer(message);
inputStream.skip(5);
Schema readerSchema = getReaderSchema();
GenericDatumReader datumReader = getDatumReader();
datumReader.setSchema(readerSchema);
return datumReader.read(null, decoder);
}
源码包为:org.apache.flink.formats.avro.AvroDeserializationSchema

相同的改动在1.9.0是可以正常工作,我想知道在读取avro格式的数据这块社区是有过什么改动吗

以下是错误信息
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: org.apache.flink.client.program.ProgramInvocationException:
Job failed (JobID: f4d2bd903a55e2d10d67d69eadba618a)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: f4d2bd903a55e2d10d67d69eadba618a)
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1681)
at
com.intsig.flink.streaming.streaming_project.abtest.GenericRecordTest.main(GenericRecordTest.java:54)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: f4d2bd903a55e2d10d67d69eadba618a)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at

Re: timestamp parsing in create table statement

2020-11-10 Thread Fanbin Bu
Thanks Jark, I confused it with the normal sql syntax.
now it works (after changing it to HH:mm:ss.SS...)


Fanbin

On Tue, Nov 10, 2020 at 7:24 PM Jark Wu  wrote:

> Oh, sorry, the example above is wrong. The column name should come first.
> So the full example should be:
>
> create table t (
>   user_id string,
>   action string,
>   ts string,
>   new_ts AS TO_TIMESTAMP(ts, '-MM-dd''T''HH:mm:ss.SSS''Z'''),
>   watermark for new_ts as new_ts - interval '5' second
> ) with (
>  ...
> )
>
> Docs:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html
>
> Best,
> Jark
>
> On Wed, 11 Nov 2020 at 11:11, Fanbin Bu  wrote:
>
>> Jark,
>>
>> Thanks for the quick response.
>> I tried to_timestamp(ts, ...), but got the following error:
>>
>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>> SQL parse failed. Encountered "(" at line
>>
>> looks like it complains about the second `(` in
>> create table t (... to_timestamp(...)...)
>>
>>
>>
>> On Tue, Nov 10, 2020 at 6:47 PM Jark Wu  wrote:
>>
>>> Hi Fanbin,
>>>
>>> The example you gave is correct:
>>>
>>> create table t (
>>>   user_id string,
>>>   action string,
>>>   ts string,
>>>   transform_ts_format(ts) as new_ts,
>>>   watermark for new_ts as new_ts - interval '5' second
>>> ) with (
>>>  ...
>>> )
>>>
>>> You can use "TO_TIMESTAMP" built-in function instead of the UDF, e.g.
>>> TO_TIMESTAMP(ts, '-MM-dd''T''HH:mm:ss.SSS''Z''') as new_ts
>>>
>>>
>>> Q1: how does watermark know new_ts is a valid timestamp?
>>> > the framework will validate the return type of the computed column
>>> expression.
>>>Currently, it must be a type of TIMESTAMP(3).
>>>
>>> Q2: is it possible to reuse ts without introducing a new column?
>>> > Currently, it is not supported. This requires to support "TIMESTAMP
>>> WITH LOCAL TIME ZONE" as rowtime attribute first.
>>>
>>> Bes,
>>> Jark
>>>
>>> On Wed, 11 Nov 2020 at 10:33, Fanbin Bu  wrote:
>>>
 In the `computed column` section of [1], i saw some related doc:

 ```
 On the other hand, computed column can be used to derive event time
 column because an event time column
 may need to be derived from existing fields, e.g. the original field is
 not TIMESTAMP(3) type or is nested in a JSON string.
 ```
 could you please provide a concrete example for this?
 Thanks
 Fanbin

 [1]
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table

 On Tue, Nov 10, 2020 at 6:18 PM Fanbin Bu 
 wrote:

> i also tried:
> ts TIMESTAMP WITH LOCAL TIME ZONE
>
> but it failed with
> Rowtime attribute 'ts' must be of type TIMESTAMP but is of type
> 'TIMESTAMP(6) WITH LOCAL TIME ZONE'.
>
> On Tue, Nov 10, 2020 at 5:42 PM Fanbin Bu 
> wrote:
>
>> Hi,
>>
>> I have source json data like:
>> {"ts": "2020-11-09T20:26:10.368123Z", "user_id": "user1", "action":
>> "click"}
>> ...
>>
>> my sql is:
>> create table t (
>> user_id string,
>> action string,
>> ts timestamp,
>> watermark for ts as ts - interval '5' second
>> ) with (
>> 'connector' = 'kafka',
>> 'topic' = 'test',
>> 'json.timestamp-format.standard' = 'ISO-8601'
>> ...
>> )
>>
>> this does not work since ISO-8601 does not expect `Z` at the end of
>> the timestamp.
>> It only works for "2020-11-09T20:26:10.368". However, I'm not able to
>> change the format.
>>
>> I checked a few threads and somebody suggested to use udf for
>> unsupported timestamp format. what would the create table statement look
>> like? I also need watermark working.
>>
>> I'm thinking about this:
>> create table t (
>> user_id string,
>> action string,
>> ts string,
>> transform_ts_format(ts) as new_ts,
>> watermark for new_ts as new_ts - interval '5' second
>> ) with (
>> ...
>>
>> q:
>> 1. how does watermark know new_ts is a valid timestamp?
>> 2. is it possible to reuse ts without introducing a new column?
>>
>> Thanks,
>> Fanbin
>>
>>


Flink sql cdc 锁超时

2020-11-10 Thread 丁浩浩
当我使用flink cdc 对多张表进行关联查询时其中的一张表总是会有锁超时的情况,导致任务无法正常启动,
请问这种情况应该如何处理?
org.apache.kafka.connect.errors.ConnectException: Lock wait timeout exceeded; 
try restarting transaction Error code: 1205; SQLSTATE: 40001.
at 
io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
at 
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
at 
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock 
wait timeout exceeded; try restarting transaction
at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:123)
at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at 
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at 
com.mysql.cj.jdbc.StatementImpl.executeInternal(StatementImpl.java:782)
at com.mysql.cj.jdbc.StatementImpl.execute(StatementImpl.java:666)
at 
io.debezium.jdbc.JdbcConnection.executeWithoutCommitting(JdbcConnection.java:1201)
at 
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:465)
... 3 more


Re: timestamp parsing in create table statement

2020-11-10 Thread Jark Wu
Oh, sorry, the example above is wrong. The column name should come first.
So the full example should be:

create table t (
  user_id string,
  action string,
  ts string,
  new_ts AS TO_TIMESTAMP(ts, '-MM-dd''T''HH:mm:ss.SSS''Z'''),
  watermark for new_ts as new_ts - interval '5' second
) with (
 ...
)

Docs:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html

Best,
Jark

On Wed, 11 Nov 2020 at 11:11, Fanbin Bu  wrote:

> Jark,
>
> Thanks for the quick response.
> I tried to_timestamp(ts, ...), but got the following error:
>
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Encountered "(" at line
>
> looks like it complains about the second `(` in
> create table t (... to_timestamp(...)...)
>
>
>
> On Tue, Nov 10, 2020 at 6:47 PM Jark Wu  wrote:
>
>> Hi Fanbin,
>>
>> The example you gave is correct:
>>
>> create table t (
>>   user_id string,
>>   action string,
>>   ts string,
>>   transform_ts_format(ts) as new_ts,
>>   watermark for new_ts as new_ts - interval '5' second
>> ) with (
>>  ...
>> )
>>
>> You can use "TO_TIMESTAMP" built-in function instead of the UDF, e.g.
>> TO_TIMESTAMP(ts, '-MM-dd''T''HH:mm:ss.SSS''Z''') as new_ts
>>
>>
>> Q1: how does watermark know new_ts is a valid timestamp?
>> > the framework will validate the return type of the computed column
>> expression.
>>Currently, it must be a type of TIMESTAMP(3).
>>
>> Q2: is it possible to reuse ts without introducing a new column?
>> > Currently, it is not supported. This requires to support "TIMESTAMP
>> WITH LOCAL TIME ZONE" as rowtime attribute first.
>>
>> Bes,
>> Jark
>>
>> On Wed, 11 Nov 2020 at 10:33, Fanbin Bu  wrote:
>>
>>> In the `computed column` section of [1], i saw some related doc:
>>>
>>> ```
>>> On the other hand, computed column can be used to derive event time
>>> column because an event time column
>>> may need to be derived from existing fields, e.g. the original field is
>>> not TIMESTAMP(3) type or is nested in a JSON string.
>>> ```
>>> could you please provide a concrete example for this?
>>> Thanks
>>> Fanbin
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
>>>
>>> On Tue, Nov 10, 2020 at 6:18 PM Fanbin Bu 
>>> wrote:
>>>
 i also tried:
 ts TIMESTAMP WITH LOCAL TIME ZONE

 but it failed with
 Rowtime attribute 'ts' must be of type TIMESTAMP but is of type
 'TIMESTAMP(6) WITH LOCAL TIME ZONE'.

 On Tue, Nov 10, 2020 at 5:42 PM Fanbin Bu 
 wrote:

> Hi,
>
> I have source json data like:
> {"ts": "2020-11-09T20:26:10.368123Z", "user_id": "user1", "action":
> "click"}
> ...
>
> my sql is:
> create table t (
> user_id string,
> action string,
> ts timestamp,
> watermark for ts as ts - interval '5' second
> ) with (
> 'connector' = 'kafka',
> 'topic' = 'test',
> 'json.timestamp-format.standard' = 'ISO-8601'
> ...
> )
>
> this does not work since ISO-8601 does not expect `Z` at the end of
> the timestamp.
> It only works for "2020-11-09T20:26:10.368". However, I'm not able to
> change the format.
>
> I checked a few threads and somebody suggested to use udf for
> unsupported timestamp format. what would the create table statement look
> like? I also need watermark working.
>
> I'm thinking about this:
> create table t (
> user_id string,
> action string,
> ts string,
> transform_ts_format(ts) as new_ts,
> watermark for new_ts as new_ts - interval '5' second
> ) with (
> ...
>
> q:
> 1. how does watermark know new_ts is a valid timestamp?
> 2. is it possible to reuse ts without introducing a new column?
>
> Thanks,
> Fanbin
>
>


Re: timestamp parsing in create table statement

2020-11-10 Thread Fanbin Bu
Jark,

Thanks for the quick response.
I tried to_timestamp(ts, ...), but got the following error:

Exception in thread "main" org.apache.flink.table.api.SqlParserException:
SQL parse failed. Encountered "(" at line

looks like it complains about the second `(` in
create table t (... to_timestamp(...)...)



On Tue, Nov 10, 2020 at 6:47 PM Jark Wu  wrote:

> Hi Fanbin,
>
> The example you gave is correct:
>
> create table t (
>   user_id string,
>   action string,
>   ts string,
>   transform_ts_format(ts) as new_ts,
>   watermark for new_ts as new_ts - interval '5' second
> ) with (
>  ...
> )
>
> You can use "TO_TIMESTAMP" built-in function instead of the UDF, e.g.
> TO_TIMESTAMP(ts, '-MM-dd''T''HH:mm:ss.SSS''Z''') as new_ts
>
>
> Q1: how does watermark know new_ts is a valid timestamp?
> > the framework will validate the return type of the computed column
> expression.
>Currently, it must be a type of TIMESTAMP(3).
>
> Q2: is it possible to reuse ts without introducing a new column?
> > Currently, it is not supported. This requires to support "TIMESTAMP WITH
> LOCAL TIME ZONE" as rowtime attribute first.
>
> Bes,
> Jark
>
> On Wed, 11 Nov 2020 at 10:33, Fanbin Bu  wrote:
>
>> In the `computed column` section of [1], i saw some related doc:
>>
>> ```
>> On the other hand, computed column can be used to derive event time
>> column because an event time column
>> may need to be derived from existing fields, e.g. the original field is
>> not TIMESTAMP(3) type or is nested in a JSON string.
>> ```
>> could you please provide a concrete example for this?
>> Thanks
>> Fanbin
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
>>
>> On Tue, Nov 10, 2020 at 6:18 PM Fanbin Bu  wrote:
>>
>>> i also tried:
>>> ts TIMESTAMP WITH LOCAL TIME ZONE
>>>
>>> but it failed with
>>> Rowtime attribute 'ts' must be of type TIMESTAMP but is of type
>>> 'TIMESTAMP(6) WITH LOCAL TIME ZONE'.
>>>
>>> On Tue, Nov 10, 2020 at 5:42 PM Fanbin Bu 
>>> wrote:
>>>
 Hi,

 I have source json data like:
 {"ts": "2020-11-09T20:26:10.368123Z", "user_id": "user1", "action":
 "click"}
 ...

 my sql is:
 create table t (
 user_id string,
 action string,
 ts timestamp,
 watermark for ts as ts - interval '5' second
 ) with (
 'connector' = 'kafka',
 'topic' = 'test',
 'json.timestamp-format.standard' = 'ISO-8601'
 ...
 )

 this does not work since ISO-8601 does not expect `Z` at the end of the
 timestamp.
 It only works for "2020-11-09T20:26:10.368". However, I'm not able to
 change the format.

 I checked a few threads and somebody suggested to use udf for
 unsupported timestamp format. what would the create table statement look
 like? I also need watermark working.

 I'm thinking about this:
 create table t (
 user_id string,
 action string,
 ts string,
 transform_ts_format(ts) as new_ts,
 watermark for new_ts as new_ts - interval '5' second
 ) with (
 ...

 q:
 1. how does watermark know new_ts is a valid timestamp?
 2. is it possible to reuse ts without introducing a new column?

 Thanks,
 Fanbin




Re: timestamp parsing in create table statement

2020-11-10 Thread Jark Wu
Hi Fanbin,

The example you gave is correct:

create table t (
  user_id string,
  action string,
  ts string,
  transform_ts_format(ts) as new_ts,
  watermark for new_ts as new_ts - interval '5' second
) with (
 ...
)

You can use "TO_TIMESTAMP" built-in function instead of the UDF, e.g.
TO_TIMESTAMP(ts, '-MM-dd''T''HH:mm:ss.SSS''Z''') as new_ts


Q1: how does watermark know new_ts is a valid timestamp?
> the framework will validate the return type of the computed column
expression.
   Currently, it must be a type of TIMESTAMP(3).

Q2: is it possible to reuse ts without introducing a new column?
> Currently, it is not supported. This requires to support "TIMESTAMP WITH
LOCAL TIME ZONE" as rowtime attribute first.

Bes,
Jark

On Wed, 11 Nov 2020 at 10:33, Fanbin Bu  wrote:

> In the `computed column` section of [1], i saw some related doc:
>
> ```
> On the other hand, computed column can be used to derive event time column
> because an event time column
> may need to be derived from existing fields, e.g. the original field is
> not TIMESTAMP(3) type or is nested in a JSON string.
> ```
> could you please provide a concrete example for this?
> Thanks
> Fanbin
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
>
> On Tue, Nov 10, 2020 at 6:18 PM Fanbin Bu  wrote:
>
>> i also tried:
>> ts TIMESTAMP WITH LOCAL TIME ZONE
>>
>> but it failed with
>> Rowtime attribute 'ts' must be of type TIMESTAMP but is of type
>> 'TIMESTAMP(6) WITH LOCAL TIME ZONE'.
>>
>> On Tue, Nov 10, 2020 at 5:42 PM Fanbin Bu  wrote:
>>
>>> Hi,
>>>
>>> I have source json data like:
>>> {"ts": "2020-11-09T20:26:10.368123Z", "user_id": "user1", "action":
>>> "click"}
>>> ...
>>>
>>> my sql is:
>>> create table t (
>>> user_id string,
>>> action string,
>>> ts timestamp,
>>> watermark for ts as ts - interval '5' second
>>> ) with (
>>> 'connector' = 'kafka',
>>> 'topic' = 'test',
>>> 'json.timestamp-format.standard' = 'ISO-8601'
>>> ...
>>> )
>>>
>>> this does not work since ISO-8601 does not expect `Z` at the end of the
>>> timestamp.
>>> It only works for "2020-11-09T20:26:10.368". However, I'm not able to
>>> change the format.
>>>
>>> I checked a few threads and somebody suggested to use udf for
>>> unsupported timestamp format. what would the create table statement look
>>> like? I also need watermark working.
>>>
>>> I'm thinking about this:
>>> create table t (
>>> user_id string,
>>> action string,
>>> ts string,
>>> transform_ts_format(ts) as new_ts,
>>> watermark for new_ts as new_ts - interval '5' second
>>> ) with (
>>> ...
>>>
>>> q:
>>> 1. how does watermark know new_ts is a valid timestamp?
>>> 2. is it possible to reuse ts without introducing a new column?
>>>
>>> Thanks,
>>> Fanbin
>>>
>>>


Re: timestamp parsing in create table statement

2020-11-10 Thread Fanbin Bu
In the `computed column` section of [1], i saw some related doc:

```
On the other hand, computed column can be used to derive event time column
because an event time column
may need to be derived from existing fields, e.g. the original field is not
TIMESTAMP(3) type or is nested in a JSON string.
```
could you please provide a concrete example for this?
Thanks
Fanbin

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table

On Tue, Nov 10, 2020 at 6:18 PM Fanbin Bu  wrote:

> i also tried:
> ts TIMESTAMP WITH LOCAL TIME ZONE
>
> but it failed with
> Rowtime attribute 'ts' must be of type TIMESTAMP but is of type
> 'TIMESTAMP(6) WITH LOCAL TIME ZONE'.
>
> On Tue, Nov 10, 2020 at 5:42 PM Fanbin Bu  wrote:
>
>> Hi,
>>
>> I have source json data like:
>> {"ts": "2020-11-09T20:26:10.368123Z", "user_id": "user1", "action":
>> "click"}
>> ...
>>
>> my sql is:
>> create table t (
>> user_id string,
>> action string,
>> ts timestamp,
>> watermark for ts as ts - interval '5' second
>> ) with (
>> 'connector' = 'kafka',
>> 'topic' = 'test',
>> 'json.timestamp-format.standard' = 'ISO-8601'
>> ...
>> )
>>
>> this does not work since ISO-8601 does not expect `Z` at the end of the
>> timestamp.
>> It only works for "2020-11-09T20:26:10.368". However, I'm not able to
>> change the format.
>>
>> I checked a few threads and somebody suggested to use udf for unsupported
>> timestamp format. what would the create table statement look like? I also
>> need watermark working.
>>
>> I'm thinking about this:
>> create table t (
>> user_id string,
>> action string,
>> ts string,
>> transform_ts_format(ts) as new_ts,
>> watermark for new_ts as new_ts - interval '5' second
>> ) with (
>> ...
>>
>> q:
>> 1. how does watermark know new_ts is a valid timestamp?
>> 2. is it possible to reuse ts without introducing a new column?
>>
>> Thanks,
>> Fanbin
>>
>>


Re: FlinkSQL kafka->dedup->kafka

2020-11-10 Thread Jark Wu
Hi Laurent,

This is because the deduplicate node generates an updating stream, however
Kafka currently only supports append-only stream.
This can be addressed in release-1.12, because we introduce a new connector
"upsert-kafka" which supports writing updating
 streams into Kafka compacted topics.

Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e.
ConsumerRecord#timestamp()?
If yes, this is also supported in release-1.12 via metadata syntax in DDL
[1]:

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  timestamp BIGINT METADATA,  -- read timestamp
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic',
  'format' = 'avro'
)

Best,
Jark

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors

On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens 
wrote:

> Hello,
>
> I'm getting an error  in Flink SQL when reading from kafka, deduplicating
> records and sending them back to Kafka.
>
> The behavior I want is the following:
>
> *input:*
> | client_number | address |
> | --- | --- |
> | 1  | addr1 |
> | 1  | addr1 |
> | 1  | addr2 |
> | 1  | addr2 |
> | 1  | addr1 |
> | 1  | addr1 |
>
> *output:*
> | client_number | address |
> | --- | --- |
> | 1  | addr1 |
> | 1  | addr2 |
> | 1  | addr1 |
>
> The error seems to say that the type of stream created by the
> deduplication query is of "update & delete" type, while kafka only supports
> append-only:
>
> Unsupported query
> Table sink 'vvp.default.sat_customers_address' doesn't support consuming
> update and delete changes which is produced by node
> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])
>
>
> --> Is there a way to create an append only query from this kind of
> deduplication query (see my code here below)?
> --> Would that work if I would use, say, a Postgres sink?
>
> Bonus question: can we extract the Kafka ingestion date using Flink SQL?
> (here I generated a processing date to allow ordering during deduplication)
>
> P.S.: I'm on the Ververica Platform, but I guess this error is linked to
> Flink SQL itself.
>
> Thanks in advance for your help.
>
> Best Regards,
>
> Laurent.
>
> ---
> -- Read from customers kafka topic
> ---
> CREATE TEMPORARY TABLE customers (
> `client_number` INT,
> `name` VARCHAR(100),
> `address` VARCHAR(100)
> )
> COMMENT ''
> WITH (
> 'connector' = 'kafka',
> 'format' = 'csv',
> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
> 'properties.group.id' = 'flinkSQL',
> 'topic' = 'customers',
> 'csv.field-delimiter' = ';',
> 'scan.startup.mode' = 'earliest-offset'
> );
>
>
>
> ---
> -- Add metadata
> ---
> CREATE TEMPORARY VIEW metadata AS
> SELECT *
> , sha256(cast(client_number as STRING)) AS customer_pk
> , current_timestamp AS load_date
> , 'Kafka topic: customers' AS record_source
> FROM customers;
>
>
>
> ---
> -- Deduplicate addresses
> ---
> CREATE TEMPORARY VIEW dedup_address as
> SELECT customer_pk
> , client_number
> , load_date
> , address
> FROM (
> SELECT customer_pk
> , client_number
> , load_date
> , record_source
> , address
> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address
> ORDER BY load_date ASC) AS rownum
> FROM metadata
> ) where rownum = 1;
>
>
>
>
>
>
> ---
> -- Send to sat_customers_address kafka topic
> ---
> CREATE TEMPORARY TABLE sat_customers_address (
> `customer_pk` VARCHAR(64),
> `client_number` INT,
> `address` VARCHAR(100)
> )
> COMMENT ''
> WITH (
> 'connector' = 'kafka',
> 'format' = 'csv',
> 'properties.bootstrap.servers' =
> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
> 'properties.group.id' = 'flinkSQL',
> 'topic' = 'sat_customers_address'
> );
>
> INSERT INTO sat_customers_address
> SELECT customer_pk
> , client_number
> , address
> FROM dedup_address;
>
>
>
>
> --
> *Laurent Exsteens*
> Data Engineer
> (M) +32 (0) 486 20 48 36
>
> *EURA NOVA*
>
> Rue Emile Francqui, 4
>
> 1435 Mont-Saint-Guibert
>
> (T) +32 10 75 02 00
>
> *euranova.eu *
>
> *research.euranova.eu* 
>
> ♻ Be green, keep it on the screen


Re: timestamp parsing in create table statement

2020-11-10 Thread Fanbin Bu
i also tried:
ts TIMESTAMP WITH LOCAL TIME ZONE

but it failed with
Rowtime attribute 'ts' must be of type TIMESTAMP but is of type
'TIMESTAMP(6) WITH LOCAL TIME ZONE'.

On Tue, Nov 10, 2020 at 5:42 PM Fanbin Bu  wrote:

> Hi,
>
> I have source json data like:
> {"ts": "2020-11-09T20:26:10.368123Z", "user_id": "user1", "action":
> "click"}
> ...
>
> my sql is:
> create table t (
> user_id string,
> action string,
> ts timestamp,
> watermark for ts as ts - interval '5' second
> ) with (
> 'connector' = 'kafka',
> 'topic' = 'test',
> 'json.timestamp-format.standard' = 'ISO-8601'
> ...
> )
>
> this does not work since ISO-8601 does not expect `Z` at the end of the
> timestamp.
> It only works for "2020-11-09T20:26:10.368". However, I'm not able to
> change the format.
>
> I checked a few threads and somebody suggested to use udf for unsupported
> timestamp format. what would the create table statement look like? I also
> need watermark working.
>
> I'm thinking about this:
> create table t (
> user_id string,
> action string,
> ts string,
> transform_ts_format(ts) as new_ts,
> watermark for new_ts as new_ts - interval '5' second
> ) with (
> ...
>
> q:
> 1. how does watermark know new_ts is a valid timestamp?
> 2. is it possible to reuse ts without introducing a new column?
>
> Thanks,
> Fanbin
>
>


Re: Re: flink tm cpu cores设置

2020-11-10 Thread zjfpla...@hotmail.com
没少啊。。



zjfpla...@hotmail.com
 
发件人: Yangze Guo
发送时间: 2020-11-10 18:34
收件人: user-zh
主题: Re: Re: flink tm cpu cores设置
你这个少了一个"v", 应该是yarn.containers.vcores
 
Best,
Yangze Guo
 
On Tue, Nov 10, 2020 at 3:43 PM zjfpla...@hotmail.com
 wrote:
>
> JM logs里面有
> Loading configuration property: yarn.containers.cores,4
>
>
>
> zjfpla...@hotmail.com
>
> 发件人: zjfpla...@hotmail.com
> 发送时间: 2020-11-10 15:33
> 收件人: user-zh
> 主题: Re: Re: flink tm cpu cores设置
> 当设为5的时候 tm cpu cores为4,当设为4的时候,tm cpu cores还是为4
>
>
>
> zjfpla...@hotmail.com
>
> 发件人: Yangze Guo
> 发送时间: 2020-11-09 10:19
> 收件人: user-zh
> 主题: Re: Re: flink tm cpu cores设置
> 如何确认没有用的呢?能分享一下jm日志么?
> 另外这个参数实际是否生效也取决于yarn的调度器是否开启了cpu调度
>
> Best,
> Yangze Guo
>
> On Thu, Nov 5, 2020 at 1:50 PM zjfpla...@hotmail.com
>  wrote:
> >
> > 这个再flink-conf.yaml中设置过没用
> >
> >
> >
> > zjfpla...@hotmail.com
> >
> > 发件人: JasonLee
> > 发送时间: 2020-11-05 13:49
> > 收件人: user-zh
> > 主题: Re: flink tm cpu cores设置
> > hi 设置yarn.containers.vcores这个参数就可以了
> >
> >
> >
> > -
> > Best Wishes
> > JasonLee
> > --
> > Sent from: 
> > https://nam12.safelinks.protection.outlook.com/?url=http%3A%2F%2Fapache-flink.147419.n8.nabble.com%2Fdata=04%7C01%7C%7C8e432287bc8b44a3642508d885643318%7C84df9e7fe9f640afb435%7C1%7C0%7C637406012678853112%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000sdata=0DpSiWj4h%2BdlNtDlokaE%2B1U3l5r7k8eQEAg5SK24sxc%3Dreserved=0


timestamp parsing in create table statement

2020-11-10 Thread Fanbin Bu
Hi,

I have source json data like:
{"ts": "2020-11-09T20:26:10.368123Z", "user_id": "user1", "action": "click"}
...

my sql is:
create table t (
user_id string,
action string,
ts timestamp,
watermark for ts as ts - interval '5' second
) with (
'connector' = 'kafka',
'topic' = 'test',
'json.timestamp-format.standard' = 'ISO-8601'
...
)

this does not work since ISO-8601 does not expect `Z` at the end of the
timestamp.
It only works for "2020-11-09T20:26:10.368". However, I'm not able to
change the format.

I checked a few threads and somebody suggested to use udf for unsupported
timestamp format. what would the create table statement look like? I also
need watermark working.

I'm thinking about this:
create table t (
user_id string,
action string,
ts string,
transform_ts_format(ts) as new_ts,
watermark for new_ts as new_ts - interval '5' second
) with (
...

q:
1. how does watermark know new_ts is a valid timestamp?
2. is it possible to reuse ts without introducing a new column?

Thanks,
Fanbin


Re: Flink Kafka Table API for python with JAAS

2020-11-10 Thread Xingbo Huang
Hi,
You can use the following API to add all the dependent jar packages you
need:

table_env.get_config().get_configuration().set_string("pipeline.jars",
"file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")

For more related content, you can refer to the pyflink doc[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_table_api_connectors.html#download-connector-and-format-jars


Best,
Xingbo

Timo Walther  于2020年11月10日周二 下午10:46写道:

> Hi,
>
> are you using the SQL jars or do you build the dependency jar file
> yourself? It might be the case that the SQL jar for Kafka does not
> include this module as the exception indicates. You might need to build
> a custom Kafka jar with Maven and all dependencies you need. (including
> correct META-INF/services entries).
>
> I hope this helps.
>
> Regards,
> Timo
>
> On 10.11.20 05:11, Sweta Kalakuntla wrote:
> > Hi,
> >
> > I am using Flink 1.11.2 version Python Table API to connect to Kafka
> > Topic using SASL protocol but it fails with the following error. I tried
> > the same properties in Flink java version, and I am able to connect. Has
> > anyone faced this issue and how did you resolve it?
> >
> > Error:
> >
> > |Caused by: javax.security.auth.login.LoginException: unable to find
> > LoginModule class:
> org.apache.kafka.common.security.plain.PlainLoginModule|
> >
> > Kafka connection:
> >
> > |kafka = Kafka()\ .version("universal") \ .topic("test_topic")\
> > .property("group.id ", "consumer_group")\
> > .property("security.protocol", "SASL_PLAINTEXT")\
> > .property("sasl.mechanism", "PLAIN")\ .property("bootstrap.servers",
> > ":9093")\ .property("sasl.jaas.config",
> > "org.apache.kafka.common.security.plain.PlainLoginModule required
> > username=\"user\" " "password=\"abc\";")\ .start_from_latest()|
> >
> >
> > Thank you,
> > SKala
> >
> > --
> >
> > BandwidthBlue.png
> >
> >
> >
> > Sweta Kalakuntla•Software Engineer
> >
> > 900 Main Campus Drive, Raleigh, NC 27606
> >
> >
> > m:216-702-1653
> >
> > e: skalakun...@bandwidth.com 
> >
>
>


Re:Re: Re:flinksql 输出到kafka用的fixed的方式 结果都输出到一个topic分区了,这个是什么原因

2020-11-10 Thread 马阳阳



这个1.11.0和1.11.1的bug,已经在1.11.2里修复了,可以看下[1]这个issue

[1] 
https://issues.apache.org/jira/browse/FLINK-19285?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22kafka%20partitioner%22














在 2020-11-10 22:26:09,"leiyanrui" <1150693...@qq.com> 写道:
>我的topic分区数是10个,sink的并发是25个 ,按照取余计算的话 也不应该只输出到一个partition的
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Changing the topology while upgrading a job submitted by SQL

2020-11-10 Thread Yi Tang
Hi Timo,

Got it, thanks

Timo Walther  于 2020年11月10日周二 22:39写道:

> Hi,
>
> unfortunately, we currently don't provide any upgrading guarantees for
> SQL. In theory we could add a possibility to add operator uids, however,
> this will not help much because the underlying SQL operators or better
> optimization rules that create a smarter pipeline could change the
> entire topology with every major Flink version upgrade.
>
> Regards,
> Timo
>
> On 10.11.20 10:15, Yi Tang wrote:
> > Hi folks,
> >
> > A question about changing the topology while upgrading a job submitted
> > by SQL.
> > Is it possible for now?
> >
> > Looks like if we want to recover a job from a savepoint, it requires the
> > uid of the operator matches the corresponding one in the state. The
> > automatically generated uid depends largely on the topology, right? So
> > we would better assign the uid manually which can not be achieved using
> SQL?
> >
> > Thanks
>
>


自定义的 MySQLSink 怎么保证一定会写到数据库呢?

2020-11-10 Thread wangl...@geekplus.com

source.map(..).addSink(new MySQLSink())

MySQLSink 就是接收前面算子生成的要执行的 SQL 并执行。 

@Override
public void invoke(JDBCStatement statement, Context context) throws Exception {

log.info(statement.getSql());
log.info(statement.getParasMap().toString());
try {
namedTemplate.update(statement.getSql(), statement.getParasMap());
} catch (Exception e) {
e.printStackTrace();
}
}
 
Flink 能保证 namedTemplate.update(statement.getSql(), statement.getParasMap()) 
一定执行成功吗?

谢谢,
王磊



wangl...@geekplus.com 



BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

2020-11-10 Thread fuyao . li

Hi Experts,

I am trying to use to implement a KeyedProcessFunction with onTimer() 
callback. I need to use event time and I meet some problems with making 
the watermark available to my operator. I meet some strange behaviors.


I have a joined retracted stream without watermark or timestamp 
information and i need to assign timestamps and watermarks to it. The 
timestamp is just a field in the stream. For the watermark generator part.


Problem:

1. I can use timelag watermark generator and make it work. But for 
BoundedOutofOrdernessGenator, The 
context.timerService().currentWatermark() in ProcessElement() always 
sticks to the initial setup and never updates.


2. I set the autoWatermark interval to 5 seconds for debug purpose, I 
only attach this watermark generator in one place with parallelism 1. 
However, I am getting 8 records at a time. timelag policy will advance 
all 8 records, outOfOrderness policy will only advance 1 records. Maybe 
the mismatch is causing the processElement() to capture the wrong 
default watermark?


https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator

This is my code for watermark generator:

@Slf4j
public class PeriodicTableOutputWatermarkGenerator implements 
WatermarkGenerator> {

    private final long maxTimeLag = 15000;
    private transient long currentMaxTimestamp = 15000;
    @Override
    public void onEvent(Tuple2 booleanRowTuple2, long 
eventTimestamp, WatermarkOutput output) {

    // the eventTimestamp is get through TimestampAssigner
    // 
https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
    currentMaxTimestamp = Math.max(eventTimestamp, 
currentMaxTimestamp);

    log.info("eventTimestamp in onEvent method: {}", eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
    // Policy 1: timelag strategy, can work and advance the timestamp
    long watermarkEpochTime = Math.max(System.currentTimeMillis() - 
maxTimeLag, currentMaxTimestamp);

    output.emitWatermark(new Watermark(watermarkEpochTime));

    // Policy 2: periodic emit based on event
    long periodicEmitWatermarkTime = currentMaxTimestamp - maxTimeLag;
    // output.emitWatermark(new Watermark(periodicEmitWatermarkTime));

    log.info("Emit Watermark: watermark based on system time: {}, 
periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
    , watermarkEpochTime, periodicEmitWatermarkTime, 
currentMaxTimestamp);

    }
}


This is my log printed by the slf4j log above. Every time, it will give 
me 8 records, why it is 8 records? I think it should be 1 in theory. I 
am very confused. Also, the policy 1 is advancing all 8 records. Policy 
2 is advancing 1 of the 8 records and not reflected in processElement().


14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266199, 
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266199, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047266198, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000


14:28:06,200 INFO 
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - 
Emit Watermark: watermark based on system time: 1605047271200, 
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO 

Re: debug statefun

2020-11-10 Thread Puneet Kinra
Hi Gordan
I have tried the harness utility , I am getting the below error even
though @*autoservice *annotation is there in function Module .

java.lang.IllegalStateException: There are no routers defined.
at
org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:31)
at
org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:71)
at org.apache.flink.statefun.flink.harness.Harness.start(Harness.java:127)
at
org.apache.flink.statefun.examples.harness.RunnerTest.run(RunnerTest.java:23)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:686)



On Tue, Nov 10, 2020 at 9:17 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> StateFun provide's a Harness utility exactly for that, allowing you to
> test a StateFun application in the IDE / setting breakpoints etc.
> You can take a look at this example on how to use the harness:
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
> .
>
> Cheers,
> Gordon
>
> On Tue, Nov 10, 2020 at 5:04 AM Lian Jiang  wrote:
>
>>
>> Hi,
>>
>> I created a POC by mimicing statefun-greeter-example. However, it failed
>> due to:
>>
>> Caused by: java.lang.IllegalStateException: There are no ingress defined.
>> at
>> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
>> ~[statefun-flink-core.jar:2.2.0]
>> at
>> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:71)
>> ~[statefun-flink-core.jar:2.2.0]
>> at
>> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:47)
>> ~[statefun-flink-core.jar:2.2.0]
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_265]
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_265]
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_265]
>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:101)
>> ~[statefun-flink-distribution.jar:2.2.0]
>> at
>> org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:55)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory.createDispatcherRunner(DefaultDispatcherRunnerFactory.java:51)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:194)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:216)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>
>> I have confirmed that something is wrong in my application causing this
>> error. However, it is hard to spot the issue visually and a little tricky
>> to debug in IDE (e.g. intellij). For example, if I can create an
>> application in Intellij and step through statefun library code and my code,
>> it will be easier to find the root cause. Any guidance on how to set this
>> up? Appreciate any hint. Thanks!
>>
>

-- 
*Cheers *

*Puneet Kinra*

Re: Stateful Functions: java.lang.IllegalStateException: There are no routers defined

2020-11-10 Thread Puneet Kinra
Hi All

 I am facing the problem while running the Harness Runner Test.I have
defined the AutoService annotation in the Function Module


java.lang.IllegalStateException: There are no routers defined.
at
org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:31)
at
org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:71)
at org.apache.flink.statefun.flink.harness.Harness.start(Harness.java:127)
at
org.apache.flink.statefun.examples.harness.RunnerTest.run(RunnerTest.java:23)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:686)

On Thu, Apr 23, 2020 at 6:10 PM Annemarie Burger <
annemarie.bur...@campus.tu-berlin.de> wrote:

> Hi,
> I'm getting to know Stateful Functions and was trying to run the Harness
> RunnerTest example. If I clone the  repository and open and execute the
> project from there it works fine, but when I copy the code into my own
> project, it keeps giving a "java.lang.IllegalStateException: There are no
> routers defined. " I changed nothing about the code, so the Module and the
> Router are both defined, and the dependencies are the same. Am I
> overlooking
> something?
> This is the full error:
>
> Exception in thread "main" java.lang.IllegalStateException: There are no
> routers defined.
> at
>
> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:31)
> at
>
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:66)
> at
> org.apache.flink.statefun.flink.harness.Harness.start(Harness.java:128)
> at gellyStreaming.gradoop.StatefulFunctions.Test.main(Test.java:17)
>
> Process finished with exit code 1
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: debug statefun

2020-11-10 Thread Lian Jiang
Igal,

I am using AutoService and I don't need to add auto-service-annotations
since it is provided by statefun-flink-core. Otherwise, my project cannot
even build. I did exactly the same as

https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-greeter-example/src/main/java/org/apache/flink/statefun/examples/greeter/GreetingModule.java

I did below test:
In statefun-greeter-example project, replace greeter jar with my jar in
Dockerfile, running this project can NOT find my module.

In my project, replace my jar with the greeter jar in Dockerfile, running
this project can find the greeter module.

So I am really puzzled about what is wrong with my jar.



Gorden,

harness test plumbing of ingress/egress. But it may not help me debug why
Flink cannot discover my module. Correct?

Thanks guys.









On Tue, Nov 10, 2020 at 9:11 AM Igal Shilman  wrote:

> Hi Lian,
>
> If you are using the statefun-sdk directly (an embedded mode) then, most
> likely is that you are missing a
> META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule
> file that would point to your modules class. We are using Java SPI [1] to
> load all the stateful functions modules at runtime.
> Alternatively, you can use the @AutoService annotation [2] (you will need
> to add a maven dependency for that [3])
>
> If you are using the remote functions deployment mode, then please make
> sure that your module.yaml file is present in your Dockerfile. (for example
> [4])
>
> Good luck,
> Igal.
>
> [1] https://docs.oracle.com/javase/tutorial/ext/basics/spi.html
> [2]
> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-greeter-example/src/main/java/org/apache/flink/statefun/examples/greeter/GreetingModule.java#L30
> [3] https://github.com/apache/flink-statefun/blob/master/pom.xml#L85,L89
> [4]
> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-python-greeter-example/Dockerfile#L20
>
> On Tue, Nov 10, 2020 at 4:47 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> StateFun provide's a Harness utility exactly for that, allowing you to
>> test a StateFun application in the IDE / setting breakpoints etc.
>> You can take a look at this example on how to use the harness:
>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>> .
>>
>> Cheers,
>> Gordon
>>
>> On Tue, Nov 10, 2020 at 5:04 AM Lian Jiang  wrote:
>>
>>>
>>> Hi,
>>>
>>> I created a POC by mimicing statefun-greeter-example. However, it failed
>>> due to:
>>>
>>> Caused by: java.lang.IllegalStateException: There are no ingress defined.
>>> at
>>> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
>>> ~[statefun-flink-core.jar:2.2.0]
>>> at
>>> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:71)
>>> ~[statefun-flink-core.jar:2.2.0]
>>> at
>>> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:47)
>>> ~[statefun-flink-core.jar:2.2.0]
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> ~[?:1.8.0_265]
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> ~[?:1.8.0_265]
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> ~[?:1.8.0_265]
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> ~[?:1.8.0_265]
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:101)
>>> ~[statefun-flink-distribution.jar:2.2.0]
>>> at
>>> org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:55)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory.createDispatcherRunner(DefaultDispatcherRunnerFactory.java:51)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:194)
>>> 

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-10 Thread Felipe Gutierrez
I see, thanks Timo

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Nov 10, 2020 at 3:22 PM Timo Walther  wrote:
>
> Hi Felipe,
>
> with non-deterministic Jark meant that you never know if the mini batch
> timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the
> execution. This depends how fast records arrive at the operator.
>
> In general, processing time can be considered non-deterministic, because
> 100ms must not be 100ms. This depends on the CPU load and other tasks
> such garbage collection etc. Only event-time (and thus event time
> windows) that work on the timestamp in the data instead of machine time
> is determistic,
>
> Regards,
> Timo
>
>
> On 10.11.20 12:02, Felipe Gutierrez wrote:
> > Hi Jark,
> >
> > thanks for your reply. Indeed, I forgot to write DISTINCT on the query
> > and now the query plan is splitting into two hash partition phases.
> >
> > what do you mean by deterministic time? Why only the window aggregate
> > is deterministic? If I implement the ProcessingTimeCallback [1]
> > interface is it deterministic?
> >
> > [1] 
> > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
> > Thanks
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu  wrote:
> >>
> >> Hi Felipe,
> >>
> >> The "Split Distinct Aggregation", i.e. the 
> >> "table.optimizer.distinct-agg.split.enabled" option,
> >>   only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
> >>
> >> However, the query in your example is using COUNT(driverId).
> >> You can update it to COUNT(DISTINCT driverId), and it should show two hash 
> >> phases.
> >>
> >> Regarding "MiniBatch Aggregation", it is not the same as a processing-time 
> >> window aggregation.
> >>
> >> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers 
> >> some input records in memory
> >>   and processes them together to reduce the state accessing. But 
> >> processing-time window is still a per-record
> >>   state accessing style. Besides, the local aggregation also applies 
> >> mini-batch, it only sends the accumulator
> >>   of current this mini-batch to the downstream global aggregation, and 
> >> this improves performance a lot.
> >> 2) The size of MiniBach is not deterministic. It may be triggered by the 
> >> number of records or a timeout.
> >>But a window aggregate is triggered by a deterministic time.
> >>
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez 
> >>  wrote:
> >>>
> >>> I realized that I forgot the image. Now it is attached.
> >>> --
> >>> -- Felipe Gutierrez
> >>> -- skype: felipe.o.gutierrez
> >>> -- https://felipeogutierrez.blogspot.com
> >>>
> >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
> >>>  wrote:
> 
>  Hi community,
> 
>  I am testing the "Split Distinct Aggregation" [1] consuming the taxi
>  ride data set. My sql query from the table environment is the one
>  below:
> 
>  Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
>  COUNT(driverId) FROM TaxiRide GROUP BY startDate");
> 
>  and I am enableing:
>  configuration.setString("table.exec.mini-batch.enabled", "true");
>  configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
>  configuration.setString("table.exec.mini-batch.size", "5000");
>  configuration.setString("table.optimizer.agg-phase-strategy", 
>  "TWO_PHASE");
>  and finally
>  configuration.setString("table.optimizer.distinct-agg.split.enabled", 
>  "true");
> 
>  I was expecting that the query plan at the WEB UI show to me two hash
>  phases as it is present here on the image [1]. Instead, it is showing
>  to me the same plan with one hash phase as I was deploying only one
>  Local aggregate and one Global aggregate (of course, taking the
>  parallel instances into consideration). Please see the query execution
>  plan image attached.
> 
>  Is there something that I am missing when I config the Table API?
>  By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
>  Is the "MiniBatch Aggregation" aggregating as a processing time window
>  on the operator after the hash phase? If it is, isn't it the same as a
>  window aggregation instead of an unbounded window as the example
>  presents?
> 
>  Thanks!
>  Felipe
> 
>  [1] 
>  https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
>  [2] 
>  https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>  --
>  -- Felipe Gutierrez
>  -- skype: felipe.o.gutierrez
> 

Re: debug statefun

2020-11-10 Thread Igal Shilman
Hi Lian,

If you are using the statefun-sdk directly (an embedded mode) then, most
likely is that you are missing a
META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule
file that would point to your modules class. We are using Java SPI [1] to
load all the stateful functions modules at runtime.
Alternatively, you can use the @AutoService annotation [2] (you will need
to add a maven dependency for that [3])

If you are using the remote functions deployment mode, then please make
sure that your module.yaml file is present in your Dockerfile. (for example
[4])

Good luck,
Igal.

[1] https://docs.oracle.com/javase/tutorial/ext/basics/spi.html
[2]
https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-greeter-example/src/main/java/org/apache/flink/statefun/examples/greeter/GreetingModule.java#L30
[3] https://github.com/apache/flink-statefun/blob/master/pom.xml#L85,L89
[4]
https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-python-greeter-example/Dockerfile#L20

On Tue, Nov 10, 2020 at 4:47 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> StateFun provide's a Harness utility exactly for that, allowing you to
> test a StateFun application in the IDE / setting breakpoints etc.
> You can take a look at this example on how to use the harness:
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
> .
>
> Cheers,
> Gordon
>
> On Tue, Nov 10, 2020 at 5:04 AM Lian Jiang  wrote:
>
>>
>> Hi,
>>
>> I created a POC by mimicing statefun-greeter-example. However, it failed
>> due to:
>>
>> Caused by: java.lang.IllegalStateException: There are no ingress defined.
>> at
>> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
>> ~[statefun-flink-core.jar:2.2.0]
>> at
>> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:71)
>> ~[statefun-flink-core.jar:2.2.0]
>> at
>> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:47)
>> ~[statefun-flink-core.jar:2.2.0]
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_265]
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_265]
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_265]
>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:101)
>> ~[statefun-flink-distribution.jar:2.2.0]
>> at
>> org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:55)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory.createDispatcherRunner(DefaultDispatcherRunnerFactory.java:51)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:194)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:216)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>
>> I have confirmed that something is wrong in my application causing this
>> error. However, it is hard to spot the issue visually and a little tricky
>> to debug in IDE (e.g. intellij). For example, if I can create an
>> application in Intellij and step through statefun library code and my code,
>> it will be easier to find the root cause. Any guidance on how to 

error in using package SubnetUtils

2020-11-10 Thread Diwakar Jha
Hello,

I'm migrating from Flink 1.8 to Flink 1.11 on an EMR cluster and I get
this error message for using package subnetUtils. Its working fine for
Flink 1.8.

 [javac] import
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hadoop.$internal.org.apache.commons.net.util.SubnetUtils;
[javac]
   ^
[javac] /workplace/.../utility/IPAddressHelper.java:31: error: package
SubnetUtils does not exist
[javac] public static final HashMap>IPS_MATCH = new HashMap>() {{
[javac] ^
[javac] 2 errors

Anyone know about this error. Any pointers?

Thanks.


Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-10 Thread Tim Josefsson
Hey Aljoscha,

I'm setting the transaction.timeout.ms when I create the FlinkKafkaProducer:

I create a Properties object and then set the property and finally add
those properties when creating the producer.

Properties producerProps = new Properties();
producerProps.setProperty("transaction.timeout.ms", "90");

If I don't set that property my I instead get the following config when
starting the job:
11:41:56,345 INFO  org.apache.kafka.clients.producer.ProducerConfig
 - ProducerConfig values:
   acks = 1
   [omitted for brevity]
   transaction.timeout.ms = 6
   transactional.id = Source: Read player events from Kafka -> Map
 Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not
needed for backfill -> Sink: Post events to playerEvents
Kafka-a15b4dd4812495cebdc94e33125ef858-1
   value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer

So I imagine the Producer is picking up the change but it still returns
errors when running the job.

Best regards,
Tim


On Tue, 10 Nov 2020 at 16:14, Aljoscha Krettek  wrote:

> On 10.11.20 11:53, Tim Josefsson wrote:
> > Also when checking my logs I see the following message:
> > 11:41:56,345 INFO  org.apache.kafka.clients.producer.ProducerConfig
> >   - ProducerConfig values:
> > acks = 1
> > [omitted for brevity]
> > transaction.timeout.ms = 90
> > transactional.id = Source: Read player events from Kafka -> Map
> >   Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not
> > needed for backfill -> Sink: Post events to playerEvents
> > Kafka-a15b4dd4812495cebdc94e33125ef858-1
> > value.serializer = class
> > org.apache.kafka.common.serialization.ByteArraySerializer
>
> The interesting thing would be to figure out where that
> `transaction.timeout.ms = 90` is coming from. The default from Flink
> would be 6, if nothing is configured. Are you specifying that value,
> maybe from the commandline or in code?
>
> Maybe it's a funny coincidence, but our StreamingKafkaITCase also
> specifies that timeout value.
>
> Best,
> Aljoscha
>
>

-- 

*Tim Josefsson*
[image: Webstep GPtW] 
mobil   +46 (0) 707 81 91 12
telefon +46 (0) 8 21 40 70

tim.josefs...@webstep.se
*webstep.se *
Suttungs gränd 2
753 19 Uppsala
Stockholm | Uppsala | Malmö | Sundsvall | Oslo
Bergen | Stavanger | Trondheim | Kristiansand
[image: LinkedIn]  [image:
Facebook]  [image: Facebook]



Re: debug statefun

2020-11-10 Thread Tzu-Li (Gordon) Tai
Hi,

StateFun provide's a Harness utility exactly for that, allowing you to test
a StateFun application in the IDE / setting breakpoints etc.
You can take a look at this example on how to use the harness:
https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
.

Cheers,
Gordon

On Tue, Nov 10, 2020 at 5:04 AM Lian Jiang  wrote:

>
> Hi,
>
> I created a POC by mimicing statefun-greeter-example. However, it failed
> due to:
>
> Caused by: java.lang.IllegalStateException: There are no ingress defined.
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
> ~[statefun-flink-core.jar:2.2.0]
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:71)
> ~[statefun-flink-core.jar:2.2.0]
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:47)
> ~[statefun-flink-core.jar:2.2.0]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_265]
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_265]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_265]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:101)
> ~[statefun-flink-distribution.jar:2.2.0]
> at
> org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:55)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory.createDispatcherRunner(DefaultDispatcherRunnerFactory.java:51)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:194)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:216)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>
> I have confirmed that something is wrong in my application causing this
> error. However, it is hard to spot the issue visually and a little tricky
> to debug in IDE (e.g. intellij). For example, if I can create an
> application in Intellij and step through statefun library code and my code,
> it will be easier to find the root cause. Any guidance on how to set this
> up? Appreciate any hint. Thanks!
>


Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-10 Thread Aljoscha Krettek

On 10.11.20 11:53, Tim Josefsson wrote:

Also when checking my logs I see the following message:
11:41:56,345 INFO  org.apache.kafka.clients.producer.ProducerConfig
  - ProducerConfig values:
acks = 1
[omitted for brevity]
transaction.timeout.ms = 90
transactional.id = Source: Read player events from Kafka -> Map
  Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not
needed for backfill -> Sink: Post events to playerEvents
Kafka-a15b4dd4812495cebdc94e33125ef858-1
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer


The interesting thing would be to figure out where that 
`transaction.timeout.ms = 90` is coming from. The default from Flink 
would be 6, if nothing is configured. Are you specifying that value, 
maybe from the commandline or in code?


Maybe it's a funny coincidence, but our StreamingKafkaITCase also 
specifies that timeout value.


Best,
Aljoscha



FlinkSQL kafka->dedup->kafka

2020-11-10 Thread Laurent Exsteens
Hello,

I'm getting an error  in Flink SQL when reading from kafka, deduplicating
records and sending them back to Kafka.

The behavior I want is the following:

*input:*
| client_number | address |
| --- | --- |
| 1  | addr1 |
| 1  | addr1 |
| 1  | addr2 |
| 1  | addr2 |
| 1  | addr1 |
| 1  | addr1 |

*output:*
| client_number | address |
| --- | --- |
| 1  | addr1 |
| 1  | addr2 |
| 1  | addr1 |

The error seems to say that the type of stream created by the deduplication
query is of "update & delete" type, while kafka only supports append-only:

Unsupported query
Table sink 'vvp.default.sat_customers_address' doesn't support consuming
update and delete changes which is produced by node
Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
$2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])


--> Is there a way to create an append only query from this kind of
deduplication query (see my code here below)?
--> Would that work if I would use, say, a Postgres sink?

Bonus question: can we extract the Kafka ingestion date using Flink SQL?
(here I generated a processing date to allow ordering during deduplication)

P.S.: I'm on the Ververica Platform, but I guess this error is linked to
Flink SQL itself.

Thanks in advance for your help.

Best Regards,

Laurent.

---
-- Read from customers kafka topic
---
CREATE TEMPORARY TABLE customers (
`client_number` INT,
`name` VARCHAR(100),
`address` VARCHAR(100)
)
COMMENT ''
WITH (
'connector' = 'kafka',
'format' = 'csv',
'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
'properties.group.id' = 'flinkSQL',
'topic' = 'customers',
'csv.field-delimiter' = ';',
'scan.startup.mode' = 'earliest-offset'
);



---
-- Add metadata
---
CREATE TEMPORARY VIEW metadata AS
SELECT *
, sha256(cast(client_number as STRING)) AS customer_pk
, current_timestamp AS load_date
, 'Kafka topic: customers' AS record_source
FROM customers;



---
-- Deduplicate addresses
---
CREATE TEMPORARY VIEW dedup_address as
SELECT customer_pk
, client_number
, load_date
, address
FROM (
SELECT customer_pk
, client_number
, load_date
, record_source
, address
, ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address ORDER
BY load_date ASC) AS rownum
FROM metadata
) where rownum = 1;






---
-- Send to sat_customers_address kafka topic
---
CREATE TEMPORARY TABLE sat_customers_address (
`customer_pk` VARCHAR(64),
`client_number` INT,
`address` VARCHAR(100)
)
COMMENT ''
WITH (
'connector' = 'kafka',
'format' = 'csv',
'properties.bootstrap.servers' =
'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
'properties.group.id' = 'flinkSQL',
'topic' = 'sat_customers_address'
);

INSERT INTO sat_customers_address
SELECT customer_pk
, client_number
, address
FROM dedup_address;




-- 
*Laurent Exsteens*
Data Engineer
(M) +32 (0) 486 20 48 36

*EURA NOVA*

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) +32 10 75 02 00

*euranova.eu *

*research.euranova.eu* 

-- 
♻ Be green, keep it on the screen


Re: Flink Kafka Table API for python with JAAS

2020-11-10 Thread Timo Walther

Hi,

are you using the SQL jars or do you build the dependency jar file 
yourself? It might be the case that the SQL jar for Kafka does not 
include this module as the exception indicates. You might need to build 
a custom Kafka jar with Maven and all dependencies you need. (including 
correct META-INF/services entries).


I hope this helps.

Regards,
Timo

On 10.11.20 05:11, Sweta Kalakuntla wrote:

Hi,

I am using Flink 1.11.2 version Python Table API to connect to Kafka 
Topic using SASL protocol but it fails with the following error. I tried 
the same properties in Flink java version, and I am able to connect. Has 
anyone faced this issue and how did you resolve it?


Error:

|Caused by: javax.security.auth.login.LoginException: unable to find 
LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule|


Kafka connection:

|kafka = Kafka()\ .version("universal") \ .topic("test_topic")\ 
.property("group.id ", "consumer_group")\ 
.property("security.protocol", "SASL_PLAINTEXT")\ 
.property("sasl.mechanism", "PLAIN")\ .property("bootstrap.servers", 
":9093")\ .property("sasl.jaas.config", 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"user\" " "password=\"abc\";")\ .start_from_latest()|



Thank you,
SKala

--

BandwidthBlue.png



Sweta Kalakuntla•Software Engineer

900 Main Campus Drive, Raleigh, NC 27606


m:216-702-1653

e: skalakun...@bandwidth.com 





Re: Changing the topology while upgrading a job submitted by SQL

2020-11-10 Thread Timo Walther

Hi,

unfortunately, we currently don't provide any upgrading guarantees for 
SQL. In theory we could add a possibility to add operator uids, however, 
this will not help much because the underlying SQL operators or better 
optimization rules that create a smarter pipeline could change the 
entire topology with every major Flink version upgrade.


Regards,
Timo

On 10.11.20 10:15, Yi Tang wrote:

Hi folks,

A question about changing the topology while upgrading a job submitted 
by SQL.

Is it possible for now?

Looks like if we want to recover a job from a savepoint, it requires the 
uid of the operator matches the corresponding one in the state. The 
automatically generated uid depends largely on the topology, right? So 
we would better assign the uid manually which can not be achieved using SQL?


Thanks




Job crash in job cluster mode

2020-11-10 Thread Tim Eckhardt
Hi there,

 

I have a problem with running a flink job in job cluster mode using flink 
1.11.1 (also tried 1.11.2).

The same job is running well using the session cluster mode as well as using 
flink 1.10.0 in job cluster mode.

 

The job starts running and is running for quite some time but it runs a lot 
slower than in session cluster mode and crashes after running for about an 
hour. I can observe in the flink dashboard that the JVM heap is constant at a 
high level and is getting slowly closer to the limit (4.13GB in my case) which 
it reaches close to the job crashing. 

There is also some G1_Old_Generation garbage collection going on which I cannot 
observe in session mode as well.

 

GC values after running for about 45min:

 

(Collector, Count, Time)

G1_Young_Generation   1,250  107,937

G1_Old_Generation  322  2,432,362

 

Compared to the GC values of the same job in session cluster mode (after the 
same runtime):

 

G1_Young_Generation   1,920  20,575

G1_Old_Generation  0  0

 

So my vague guess is that it has to be something memory related maybe 
configuration wise.

 

To simplify the setup only one jobmanager and one taskmanager is used. The 
taskmanager has a memory setting of: taskmanager.memory.process.size: 1m 
which should be totally fine for the server. The jobmanager has a defined 
heap_size of 1600m. 

 

Maybe somebody has experienced something like this before?

 

Also is there a way to export the currently loaded configuration parameters of 
the job- and taskmanagers in a cluster? For example I can’t see the current 
memory process size of the taskmanager in the flink dashboard. Because this way 
I could compare the running and crashing setups more easily (using docker and 
environment variables for configuration at the moment which makes it a bit 
harder to debug).

 

Thanks.



smime.p7s
Description: S/MIME cryptographic signature


所有的taskmanager都在一台节点上运行

2020-11-10 Thread liya...@huimin100.cn
程序是消费kafka某个topic的数据到另外一个topic中
提交命令如下:./bin/flink run -m yarn-cluster -ys 2 -yjm 2048 -ytm 4096 
--parallelism=8...
我想让这四个taskmanager分配到多个节点上,但是最终都在一个节点上运行,希望有大佬帮忙指点下


如图:所有的taskmanager都运行在cdh3上


liya...@huimin100.cn


所有的taskmanager都在一台节点上运行

2020-11-10 Thread liya...@huimin100.cn
程序是消费kafka某个topic的数据到另外一个topic中
提交命令如下:./bin/flink run -m yarn-cluster -ys 2 -yjm 2048 -ytm 4096 
--parallelism=8...
我想让这四个taskmanager分配到多个节点上,但是最终都在一个节点上运行,希望有大佬帮忙指点下


如图:所有的taskmanager都运行在cdh3上
 



liya...@huimin100.cn


Re: Re:flinksql 输出到kafka用的fixed的方式 结果都输出到一个topic分区了,这个是什么原因

2020-11-10 Thread leiyanrui
我的topic分区数是10个,sink的并发是25个 ,按照取余计算的话 也不应该只输出到一个partition的



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:flinksql 输出到kafka用的fixed的方式 结果都输出到一个topic分区了,这个是什么原因

2020-11-10 Thread hailongwang
Hi leiyanrui,


   当 sink 的并发 小于 kafka partition 个数,同时 sink 并发只有一个时,FixPartitioner 
会导致只会往一个分区发数据。
详见 FixPartitioner[1] 的实现,其中 `parallelInstanceId` 表示 subtask 的 
编号,从0开始;`partitions.length` 表示该 topic 的分区个数,
最后返回该 subtask 应该往哪个分区发数据。


[1] 
https://github.com/apache/flink/blob/d00941c77170b233c9fe599c7fb0003778eb3299/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java#L75


Best,
hailong

At 2020-11-10 20:45:59, "leiyanrui" <1150693...@qq.com> wrote:
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-10 Thread Timo Walther

Hi Felipe,

with non-deterministic Jark meant that you never know if the mini batch 
timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the 
execution. This depends how fast records arrive at the operator.


In general, processing time can be considered non-deterministic, because 
100ms must not be 100ms. This depends on the CPU load and other tasks 
such garbage collection etc. Only event-time (and thus event time 
windows) that work on the timestamp in the data instead of machine time 
is determistic,


Regards,
Timo


On 10.11.20 12:02, Felipe Gutierrez wrote:

Hi Jark,

thanks for your reply. Indeed, I forgot to write DISTINCT on the query
and now the query plan is splitting into two hash partition phases.

what do you mean by deterministic time? Why only the window aggregate
is deterministic? If I implement the ProcessingTimeCallback [1]
interface is it deterministic?

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
Thanks

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Nov 10, 2020 at 7:55 AM Jark Wu  wrote:


Hi Felipe,

The "Split Distinct Aggregation", i.e. the 
"table.optimizer.distinct-agg.split.enabled" option,
  only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).

However, the query in your example is using COUNT(driverId).
You can update it to COUNT(DISTINCT driverId), and it should show two hash 
phases.

Regarding "MiniBatch Aggregation", it is not the same as a processing-time 
window aggregation.

1) MiniBatch is just an optimization on unbounded aggregation, it buffers some 
input records in memory
  and processes them together to reduce the state accessing. But 
processing-time window is still a per-record
  state accessing style. Besides, the local aggregation also applies 
mini-batch, it only sends the accumulator
  of current this mini-batch to the downstream global aggregation, and this 
improves performance a lot.
2) The size of MiniBach is not deterministic. It may be triggered by the number 
of records or a timeout.
   But a window aggregate is triggered by a deterministic time.


Best,
Jark


On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez  
wrote:


I realized that I forgot the image. Now it is attached.
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
 wrote:


Hi community,

I am testing the "Split Distinct Aggregation" [1] consuming the taxi
ride data set. My sql query from the table environment is the one
below:

Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
COUNT(driverId) FROM TaxiRide GROUP BY startDate");

and I am enableing:
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
and finally
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");

I was expecting that the query plan at the WEB UI show to me two hash
phases as it is present here on the image [1]. Instead, it is showing
to me the same plan with one hash phase as I was deploying only one
Local aggregate and one Global aggregate (of course, taking the
parallel instances into consideration). Please see the query execution
plan image attached.

Is there something that I am missing when I config the Table API?
By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
Is the "MiniBatch Aggregation" aggregating as a processing time window
on the operator after the hash phase? If it is, isn't it the same as a
window aggregation instead of an unbounded window as the example
presents?

Thanks!
Felipe

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com






flinksql 输出到kafka用的fixed的方式 结果都输出到一个topic分区了,这个是什么原因

2020-11-10 Thread leiyanrui





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink sql查询NULL值错误

2020-11-10 Thread 丁浩浩
感谢大佬!!!

> 在 2020年11月10日,下午8:22,hailongwang <18868816...@163.com> 写道:
> 
> Hi,
> 
> 
> 需要将 null cast 成某个具体的值,比如:
> if(type=1,2,cast(null as int))
> 
> 
> Best,
> Hailong
> 在 2020-11-10 19:14:44,"丁浩浩" <18579099...@163.com> 写道:
>> Select 
>>  id,
>>  name,
>>  if(type=1,2,null) 
>> From
>>  user ;
>> 当我执行上面的sql的时候提示我
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of ‘NULL’ 
>> 是无法将null展示吗?
> 
> 
> 




Re: Table API, accessing nested fields

2020-11-10 Thread Dawid Wysakowicz
You should use the get method:

 val table = tenv
      .fromDataStream(stream)
      .select($"context".get("url"), $"name")

Best,

Dawid

On 10/11/2020 10:15, Ori Popowski wrote:
>
> How can I access nested fields e.g. in select statements?
>
> For example, this won't work:
>
>  val table = tenv
>       .fromDataStream(stream)
>       .select($"context.url", $"name")   
>
> What is the correct way?
>
> Thanks.


signature.asc
Description: OpenPGP digital signature


Re:Flink sql查询NULL值错误

2020-11-10 Thread hailongwang
Hi,


需要将 null cast 成某个具体的值,比如:
if(type=1,2,cast(null as int))


Best,
Hailong
在 2020-11-10 19:14:44,"丁浩浩" <18579099...@163.com> 写道:
>Select 
>   id,
>   name,
>   if(type=1,2,null) 
>From
>   user ;
>当我执行上面的sql的时候提示我
>[ERROR] Could not execute SQL statement. Reason:
>org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of ‘NULL’ 
>是无法将null展示吗?





Flink sql查询NULL值错误

2020-11-10 Thread 丁浩浩
Select 
id,
name,
if(type=1,2,null) 
From
user ;
当我执行上面的sql的时候提示我
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of ‘NULL’ 
是无法将null展示吗?

Re: 讨论分析:数据类型对于shuffle时数据传输IO速度的影响(数十倍的差距)

2020-11-10 Thread 赵一旦
我只看到了你byte send=xxxMB,这个数量级1个并发度都够感觉。。。

Husky Zeng <568793...@qq.com> 于2020年10月30日周五 下午4:16写道:

> shuffle位置在前面时:
>
> 
>
>
>
> shuffle位置在后面时:
>
> 
>
>
>
> 
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-10 Thread Felipe Gutierrez
Hi Jark,

thanks for your reply. Indeed, I forgot to write DISTINCT on the query
and now the query plan is splitting into two hash partition phases.

what do you mean by deterministic time? Why only the window aggregate
is deterministic? If I implement the ProcessingTimeCallback [1]
interface is it deterministic?

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
Thanks

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Nov 10, 2020 at 7:55 AM Jark Wu  wrote:
>
> Hi Felipe,
>
> The "Split Distinct Aggregation", i.e. the 
> "table.optimizer.distinct-agg.split.enabled" option,
>  only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
>
> However, the query in your example is using COUNT(driverId).
> You can update it to COUNT(DISTINCT driverId), and it should show two hash 
> phases.
>
> Regarding "MiniBatch Aggregation", it is not the same as a processing-time 
> window aggregation.
>
> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers 
> some input records in memory
>  and processes them together to reduce the state accessing. But 
> processing-time window is still a per-record
>  state accessing style. Besides, the local aggregation also applies 
> mini-batch, it only sends the accumulator
>  of current this mini-batch to the downstream global aggregation, and this 
> improves performance a lot.
> 2) The size of MiniBach is not deterministic. It may be triggered by the 
> number of records or a timeout.
>   But a window aggregate is triggered by a deterministic time.
>
>
> Best,
> Jark
>
>
> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez  
> wrote:
>>
>> I realized that I forgot the image. Now it is attached.
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com
>>
>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
>>  wrote:
>> >
>> > Hi community,
>> >
>> > I am testing the "Split Distinct Aggregation" [1] consuming the taxi
>> > ride data set. My sql query from the table environment is the one
>> > below:
>> >
>> > Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
>> > COUNT(driverId) FROM TaxiRide GROUP BY startDate");
>> >
>> > and I am enableing:
>> > configuration.setString("table.exec.mini-batch.enabled", "true");
>> > configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
>> > configuration.setString("table.exec.mini-batch.size", "5000");
>> > configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
>> > and finally
>> > configuration.setString("table.optimizer.distinct-agg.split.enabled", 
>> > "true");
>> >
>> > I was expecting that the query plan at the WEB UI show to me two hash
>> > phases as it is present here on the image [1]. Instead, it is showing
>> > to me the same plan with one hash phase as I was deploying only one
>> > Local aggregate and one Global aggregate (of course, taking the
>> > parallel instances into consideration). Please see the query execution
>> > plan image attached.
>> >
>> > Is there something that I am missing when I config the Table API?
>> > By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
>> > Is the "MiniBatch Aggregation" aggregating as a processing time window
>> > on the operator after the hash phase? If it is, isn't it the same as a
>> > window aggregation instead of an unbounded window as the example
>> > presents?
>> >
>> > Thanks!
>> > Felipe
>> >
>> > [1] 
>> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
>> > [2] 
>> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>> > --
>> > -- Felipe Gutierrez
>> > -- skype: felipe.o.gutierrez
>> > -- https://felipeogutierrez.blogspot.com


Re: Re: 不同的算子能共用一个状态吗?

2020-11-10 Thread 赵一旦
可以的话,使用ConnectedStream即可。
StreamA和StreamB连接之后的算子中定义2个MapState状态即可。

hl9...@126.com  于2020年11月4日周三 下午3:08写道:

> 感谢。我这个场景中op1和op2是串行的,那只能把op1的状态也发到下游的op2。
>
>
>
> hl9...@126.com
>
> 发件人: Qi Kang
> 发送时间: 2020-11-04 14:53
> 收件人: user-zh
> 主题: Re: 不同的算子能共用一个状态吗?
> Hi,
>
>
> Flink不支持算子共享状态。如果你的op1和op2是并行的,就只能利用外部存储间接地共享状态数据。如果是串行的(op2在op1的下游),也可以尝试考虑把op1产生的状态数据作为流元素发送到op2中去。希望对你有所帮助。
>
>
> > On Nov 4, 2020, at 14:48, hl9...@126.com wrote:
> >
> > Hi,all:
> > 我定义了两个flatmap算子(op1和op2),op1里面定义了一个MapState变量,我想在op2里面直接用这个状态,可以吗?
> > 我感觉是不行的,没有找到相关的api。请各位大佬帮忙明确一下。
> >
> >
> >
> > hl9...@126.com
>
>


Re: flink侧输出算子堵住的问题

2020-11-10 Thread 赵一旦
具体DAG描述不够清晰,看不出啥问题。

ZT.Ren <18668118...@163.com> 于2020年11月9日周一 下午3:53写道:

> 问题描述:(兄弟部门的问题,不方便截图,请求原谅~~)
>   1. flink版本:  1.6.0
>   2. 基本流程:flink读取kafka数据 -> json解析->(process并行度6)往下游11条pipeline发送数据
>   3. 问题现象: 运行一段时间后,该任务堵住,sink端无数据产生
>   4. 监控信息: 任务在map->sideprocess算子处出现反压,下游window->sink未出现反压。
> map->sideprocess算子task metrics的outputBufferPool偶尔变成1,绝大时间处于0
>  目前感觉,process(并行度6) ->侧路输出到下游(11条分支), 这种场景下侧路输出是否支持?
>
>
>


Re: KeyBy如何映射到物理分区

2020-11-10 Thread 赵一旦
自定义分区可以的哈。
你说123都加起来那个不够具体,我猜你是直接用DataStream的sum,自然是全局sum。
分key的聚合必须是keyBy(...).sum(...)这样。

但是,partitionCustom返回的是DataStream。而如果继续keyBy会覆盖partitioner。

所以,你需要自己组装下transformation,也不难的。点进去看看源码怎么组装,抄一抄就可以。



zxyoung  于2020年11月6日周五 下午4:47写道:

> Hi,请教下各位:
>我的场景是现在有个Keyby操作,但是我需要指定某一个key落地在某一个具体物理分区中。
>
>  我注意到keyby中得KeySelector仅仅是逻辑的分区,其实还是通过hash的方式来物理分区,没有办法指定哪一个key到哪一个分区去做。
>
>  
> 我尝试使用partitionCustom中带有partitioner和keySelector的参数函数,但是发现没有办法直接使用类似Sum一类的聚合函数,实际测试发现Sum会将同一物理分区、但是不同Key的值都累加起来。
>
>  例如Tuple2,id=1/2/3的给分区0,id=4的给分区1,直接使用sum的话,会将id=1/2/3的time都累加起来。
>有什么方法能让keyby方法也能够物理分区吗?还是只能在partitionCustom后给map算子加逻辑使得累加操作正确。


Re: Re: flink tm cpu cores设置

2020-11-10 Thread Yangze Guo
你这个少了一个"v", 应该是yarn.containers.vcores

Best,
Yangze Guo

On Tue, Nov 10, 2020 at 3:43 PM zjfpla...@hotmail.com
 wrote:
>
> JM logs里面有
> Loading configuration property: yarn.containers.cores,4
>
>
>
> zjfpla...@hotmail.com
>
> 发件人: zjfpla...@hotmail.com
> 发送时间: 2020-11-10 15:33
> 收件人: user-zh
> 主题: Re: Re: flink tm cpu cores设置
> 当设为5的时候 tm cpu cores为4,当设为4的时候,tm cpu cores还是为4
>
>
>
> zjfpla...@hotmail.com
>
> 发件人: Yangze Guo
> 发送时间: 2020-11-09 10:19
> 收件人: user-zh
> 主题: Re: Re: flink tm cpu cores设置
> 如何确认没有用的呢?能分享一下jm日志么?
> 另外这个参数实际是否生效也取决于yarn的调度器是否开启了cpu调度
>
> Best,
> Yangze Guo
>
> On Thu, Nov 5, 2020 at 1:50 PM zjfpla...@hotmail.com
>  wrote:
> >
> > 这个再flink-conf.yaml中设置过没用
> >
> >
> >
> > zjfpla...@hotmail.com
> >
> > 发件人: JasonLee
> > 发送时间: 2020-11-05 13:49
> > 收件人: user-zh
> > 主题: Re: flink tm cpu cores设置
> > hi 设置yarn.containers.vcores这个参数就可以了
> >
> >
> >
> > -
> > Best Wishes
> > JasonLee
> > --
> > Sent from: 
> > https://nam12.safelinks.protection.outlook.com/?url=http%3A%2F%2Fapache-flink.147419.n8.nabble.com%2Fdata=04%7C01%7C%7Cb671605ec41446d8662d08d88455e3ee%7C84df9e7fe9f640afb435%7C1%7C0%7C637404851712945054%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000sdata=NVzrOsZuELZMWsx%2B9PjXjB8F4J4184pILw65N3r%2FOQQ%3Dreserved=0


Table API, accessing nested fields

2020-11-10 Thread Ori Popowski
How can I access nested fields e.g. in select statements?

For example, this won't work:

 val table = tenv
  .fromDataStream(stream)
  .select($"context.url", $"name")

What is the correct way?

Thanks.


Changing the topology while upgrading a job submitted by SQL

2020-11-10 Thread Yi Tang
Hi folks,

A question about changing the topology while upgrading a job submitted by
SQL.
Is it possible for now?

Looks like if we want to recover a job from a savepoint, it requires the
uid of the operator matches the corresponding one in the state. The
automatically generated uid depends largely on the topology, right? So we
would better assign the uid manually which can not be achieved using SQL?

Thanks


Re: Help needed to increase throughput of simple flink app

2020-11-10 Thread ashwinkonale
Hey,
I am reading messages with schema id and using confluent schema registry to
deserialize to Genericrecord. After this point, pipelineline will have this
objects moving across. Can you give me some examples of `special handling of
avro messages` you mentioned ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: SQL从1.9迁移到1.11的问题

2020-11-10 Thread zilong xiao
关于1.11 获取执行计划,我向社区提了一个issue:
https://issues.apache.org/jira/browse/FLINK-19687,我觉得这个应该是需要支持的,可以关注下

izual  于2020年10月30日周五 下午5:04写道:

> hi,Community:
>
>
> 我们目前使用的是 flink 1.9.1 执行 SQL 任务,主要使用了以下几种接口:
> 1. sqlQuery sqlUpdate: 执行表的创建、查找和写入
> 2. toAppendStream/toRetractStream:将表转换为流后,通过 DataStream.addSink(new
> RichSinkFunction )写入
> 3. registerDataStream:将流注册为表,下一步使用 sqlQuery/sqlUpdate 读写该表
>
>
> 最后通过 env.execute() 或者 tableEnv.execute() 执行:通过 RichSinkFunction.invoke 或者
> sqlUpdate(DML) 更新到存储,这两种输出形式都可能多次调用。
>
>
> 看到文档里,这部分接口 [1][2] 的行为有些变化,实际使用1.11后,有几处困惑想请教:
>
>
> 1. 如果预期混用 SQL/DataStream 的接口,我目前按照3里的介绍,使用 sqlUpdate,然后通过 tEnv.execute()
> 来输出。具体的,程序设置两个输出,分别是 RichSinkFunction.invoke 以及 sqlUpdate,观察到只有 sqlUpdate
> 更新了数据,RichSinkFunction 没有执行。如果希望同时输出的话,是必须将 RichSinkFunction.invoke
> 的部分也都实现为 StreamTableSink 么,是否有其他成本较低的迁移方式?如果按照 1.11 区分 env/tableEnv
> 的思路,这种情况怎么实现更加合理?
> 2. 对于这种情况,env.getExecutionPlan 获取的只是调用 DataStream 接口的 DAG 图,如果要获取 Table
> 操作流程的 DAG,应该通过 tableEnv 的哪个接口获取?
>
>
> 1.
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/release-notes/flink-1.11.html#corrected-execution-behavior-of-tableenvironmentexecute-and-streamtableenvironmentexecute-flink-16363
> 2.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> 3.
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query
>
>