Re: Developing Beam applications using Flink checkpoints

2020-05-20 Thread Eleanore Jin
Hi Ivan,

Beam coders are wrapped in Flink's TypeSerializers. So I don't think it
will result in double serialization.

Thanks!
Eleanore

On Tue, May 19, 2020 at 4:04 AM Ivan San Jose 
wrote:

> Perfect, thank you so much Arvid, I'd expect more people using Beam on
> top of Flink, but it seems is not so popular.
>
> On Tue, 2020-05-19 at 12:46 +0200, Arvid Heise wrote:
> > Hi Ivan,
> >
> > I'm fearing that only a few mailing list users have actually deeper
> > Beam experience. I only used it briefly 3 years ago. Most users here
> > are using Flink directly to avoid these kinds of double-abstraction
> > issues.
> >
> > It might be better to switch to the Beam mailing list if you have
> > Beam-specific questions including how the Flink runner actually
> > translates the Beam program to Flink.
> >
> > On Tue, May 19, 2020 at 11:38 AM Ivan San Jose <
> > isanj...@theworkshop.com> wrote:
> > > Actually I'm also thinking about how Beam coders are related with
> > > runner's serialization... I mean, on Beam you specify a coder per
> > > each
> > > Java type in order to Beam be able to serialize/deserialize that
> > > type,
> > > but then, is the runner used under the hood
> > > serializing/deserializing
> > > again the result, so that is doing a double serialization, does it
> > > make
> > > sense? Or how does it work?
> > >
> > > On Tue, 2020-05-19 at 08:54 +, Ivan San Jose wrote:
> > > > Yep, sorry if I'm bothering you but I think I'm still not getting
> > > > this,
> > > > how could I tell Beam to tell Flink to use that serializer
> > > instead of
> > > > Java standard one, because I think Beam is abstracting us from
> > > Flink
> > > > checkpointing mechanism, so I'm afraid that if we use Flink API
> > > > directly we might break other things that Beam is hidding for
> > > us...
> > > >
> > > > On Tue, 2020-05-19 at 10:44 +0200, Arvid Heise wrote:
> > > > > Hi Ivan,
> > > > >
> > > > > The easiest way is to use some implementation that's already
> > > there
> > > > > [1]. I already mentioned Avro and would strongly recommend
> > > giving
> > > > > it
> > > > > a go. If you make sure to provide a default value for as many
> > > > > fields
> > > > > as possible, you can always remove them later giving you great
> > > > > flexibility. I can give you more hints if you decide to go this
> > > > > route.
> > > > >
> > > > > If you want to have a custom implementation, I'd start at
> > > looking
> > > > > of
> > > > > one of the simpler implementations like MapSerializerSnapshot
> > > [2].
> > > > >
> > > > > [1]
> > > > >
> > >
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html
> > > > > (see known implementing classes).
> > > > > [2]
> > > > >
> > >
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
> > > > >
> > > > > On Tue, May 19, 2020 at 10:31 AM Ivan San Jose <
> > > > > isanj...@theworkshop.com> wrote:
> > > > > > Thanks for your complete answer Arvid, we will try to
> > > approach
> > > > > > all
> > > > > > things you mentioned, but take into account we are using Beam
> > > on
> > > > > > top of
> > > > > > Flink, so, to be honest, I don't know how could we implement
> > > the
> > > > > > custom
> > > > > > serialization thing (
> > > > > >
> > >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > > > > ) there. Could you please give us some hints? Thanks
> > > > > >
> > > > > > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:
> > > > > > > Hi Ivan,
> > > > > > >
> > > > > > > First let's address the issue with idle partitions. The
> > > > > > > solution
> > > > > > is
> > > > > > > to use a watermark assigner that also emits a watermark
> > > with
> > > > > > > some
> > > > > > > idle timeout [1].
> > > > > > >
> > > > > > > Now the second question, on why Kafka commits are committed
> > > for
> > > > > > in-
> > > > > > > flight, checkpointed data. The basic idea is that you are
> > > not
> > > > > > losing
> > > > > > > data while avoiding replicated output.
> > > > > > > So if you commit offsets only after data has been fully
> > > > > > processed,
> > > > > > > upon crash the same data point would be reprocessed jointly
> > > > > > > with
> > > > > > the
> > > > > > > restored in-flight data, so you get duplicate messages in
> > > your
> > > > > > > system.
> > > > > > > To avoid duplicates data needs to be more or less
> > > completely
> > > > > > flushed
> > > > > > > out the system before a checkpoint is performed. That would
> > > > > > produce a
> > > > > > > huge downtime.
> > > > > > > Instead, we assume that we can always resume from the
> > > > > > checkpoints.
> > > > > > > Which leads to the last question on what to do when your
> > > > > > > pipeline
> > > > > > has
> > > > > > > breaking changes.
> > > > > > > 

Re: How do I get the IP of the master and slave files programmatically in Flink?

2020-05-20 Thread Yangze Guo
Hi, Felipe

Do you mean to get the Host and Port of the task executor where your
operator is indeed running on?

If that is the case, IIUC, two possible components that contain this
information are RuntimeContext and the Configuration param of
RichFunction#open. After reading the relevant code path, it seems you
could not get it at the moment.

Best,
Yangze Guo

Best,
Yangze Guo


On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov
 wrote:
>
> Hi Felippe,
>
> could you clarify in some more details what you are trying to achieve?
>
> Best regards,
>
> --
>
> Alexander Fedulov | Solutions Architect
>
> +49 1514 6265796
>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward - The Apache Flink Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji 
> (Tony) Cheng
>
>
>
>
> On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez 
>  wrote:
>>
>> Hi all,
>>
>> I have my own operator that extends the AbstractUdfStreamOperator
>> class and I want to issue some messages to it. Sometimes the operator
>> instances are deployed on different TaskManagers and I would like to
>> set some attributes like the master and slave IPs on it.
>>
>> I am trying to use these values but they only return localhost, not
>> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
>> 192.168.56.1).
>>
>> ConfigOption restAddressOption = ConfigOptions
>>.key("rest.address")
>>.stringType()
>>.noDefaultValue();
>> System.out.println("DefaultJobManagerRunnerFactory rest.address: " +
>> jobMasterConfiguration.getConfiguration().getValue(restAddressOption));
>> System.out.println("rpcService: " + rpcService.getAddress());
>>
>>
>> Thanks,
>> Felipe
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com


Re: flink如何正则读取hdfs下的文件

2020-05-20 Thread Jingsong Li
Hi,

志华,
如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。
目前Table层并不原生支持filter,你可以考虑自己写一个table connector。
但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。

jimandlice,
-
如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1]
- 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html

Best,
Jingsong Lee

On Thu, May 21, 2020 at 10:59 AM jimandlice  wrote:

> flink 写入hive 使用api 思路是怎么的呢
>
>
> | |
> jimandlice
> |
> |
> 邮箱:jimandl...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年05月21日 10:57,阿华田 写道:
> flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件
>
>
> | |
> 王志华
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>

-- 
Best, Jingsong Lee


Re: flink sql jdbc 只要含有group by 语句就会报错No operations allowed after statement closed.

2020-05-20 Thread Rui Li
看异常感觉是连接超时导致的,有一个相关的jira,https://issues.apache.org/jira/browse/FLINK-16681

On Thu, May 21, 2020 at 11:41 AM shao.hongxiao <17611022...@163.com> wrote:

> |INSERT INTO pvuv_sink
> |SELECT
> | user_id,item_id,category_id,count(1) as cnt
> |FROM user_log
> |group by user_id,item_id,category_id
> 以上是报错得sql
> 11:21:57,157 ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat
> - JDBC executeBatch error, retry times = 1
> java.sql.SQLException: No operations allowed after statement closed.
>
>
> 正常sql,可以插入mysql
> |INSERT INTO pvuv_sink
> |SELECT
> | user_id,item_id,category_id,1
> |FROM user_log
>
> | |
> 邵红晓
> |
> |
> 邮箱:17611022...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年5月21日 11:39,Rui Li 写道:
> Hi,看不到你贴的SQL,试试直接发一下文字呢?
>
> On Thu, May 21, 2020 at 11:25 AM shao.hongxiao <17611022...@163.com>
> wrote:
>
> 报错sql
> 正常执行sql
>
> 这是为什么呀,好奇怪
> 邵红晓
> 邮箱:17611022...@163.com
>
> <
> https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=%E9%82%B5%E7%BA%A2%E6%99%93=17611022895%40163.com=http%3A%2F%2Fmail-online.nosdn.127.net%2Fde509f973023059c028c85e411fc61eb.jpg=%5B%22%22%2C%22%E9%82%AE%E7%AE%B1%EF%BC%9A17611022895%40163.com%22%2C%22%22%2C%22%22%2C%22%22%5D
> >
> 签名由 网易邮箱大师  定制
>
>
>
> --
> Best regards!
> Rui Li
>


-- 
Best regards!
Rui Li


回复: flink sql jdbc 只要含有group by 语句就会报错No operations allowed after statement closed.

2020-05-20 Thread shao.hongxiao
|INSERT INTO pvuv_sink
|SELECT
| user_id,item_id,category_id,count(1) as cnt
|FROM user_log
|group by user_id,item_id,category_id
以上是报错得sql
11:21:57,157 ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat 
 - JDBC executeBatch error, retry times = 1
java.sql.SQLException: No operations allowed after statement closed.


正常sql,可以插入mysql
|INSERT INTO pvuv_sink
|SELECT
| user_id,item_id,category_id,1
|FROM user_log

| |
邵红晓
|
|
邮箱:17611022...@163.com
|
签名由网易邮箱大师定制
在2020年5月21日 11:39,Rui Li 写道:
Hi,看不到你贴的SQL,试试直接发一下文字呢?

On Thu, May 21, 2020 at 11:25 AM shao.hongxiao <17611022...@163.com> wrote:

报错sql
正常执行sql

这是为什么呀,好奇怪
邵红晓
邮箱:17611022...@163.com


签名由 网易邮箱大师  定制



--
Best regards!
Rui Li


Re: flink sql jdbc 只要含有group by 语句就会报错No operations allowed after statement closed.

2020-05-20 Thread Rui Li
Hi,看不到你贴的SQL,试试直接发一下文字呢?

On Thu, May 21, 2020 at 11:25 AM shao.hongxiao <17611022...@163.com> wrote:

> 报错sql
> 正常执行sql
>
> 这是为什么呀,好奇怪
> 邵红晓
> 邮箱:17611022...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>


-- 
Best regards!
Rui Li


flink sql jdbc 只要含有group by 语句就会报错No operations allowed after statement closed.

2020-05-20 Thread shao.hongxiao
报错sql
正常执行sql


这是为什么呀,好奇怪
| |
邵红晓
|
|
邮箱:17611022...@163.com
|
签名由网易邮箱大师定制

Re: TM太多,作业运行失败问题

2020-05-20 Thread Xintong Song
有没有可能是 pod ip 数不够了,或者 pod 上的 ip table 限制了 entry 数量之类的?

Thank you~

Xintong Song



On Wed, May 20, 2020 at 6:44 PM  wrote:

> hi,xintong
>
> 我这边观察到的现象,从系统日志上没有找到被内核oom
> kill的日志。作业cancel掉后,失联的tm会重连上来,pod没有被kill掉。初步怀疑是网络层面的问题,感觉是cni有什么限制。
>
> thanks~
>
>
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年05月20日 17:56,Xintong Song 写道:
> Hi,
>
> 从日志看,报错的根本原因是有 TM 挂掉了,导致 pod 被 remove,这样从其他 TM 上就找不到挂掉的 TM
> 的地址。你可以确认一下,发生错误的时候是否有 TM 挂掉/重启。
>
> 至于 TM 挂掉的原因,需要想办法获取到失败 TM
> 的日志。按照你之前的描述,集群启动的时候是没问题的,作业执行的时候才有问题。我现在怀疑的方向是,作业执行造成的资源问题使得 TM 发生了 OOM
> 或者是内存超用被 Kubernetes 杀掉了。你在修改 TM 数量、slot 数量的过程中,是否调整了 TM
> 的资源大小?另外即使没有调整,作业本身消耗的资源也会有所变化,例如 TM 数量变多导致每个 TM
> 需要建立更多的网络连接从而消耗的内存。具体还是需要根据日志分析。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, May 20, 2020 at 4:50 PM  wrote:
>
> > hi,xintong,堆栈信息如下。
> >
> > 2020-05-20 16:46:20
> > org.apache.flink.runtime.io
> .network.partition.consumer.PartitionConnectionException:
> > Connection for partition
> > 66c378b86c3e100e4a2d34927c4b7281@bb397f70ad4474d2beac18d484d726af not
> > reachable.
> >  at org.apache.flink.runtime.io
> >
> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
> >  at org.apache.flink.runtime.io
> >
> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:240)
> >  at org.apache.flink.runtime.io
> >
> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:218)
> >  at
> >
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
> >  at
> >
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:864)
> >  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:624)
> >  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> >  at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.io.IOException: Connecting the channel failed: Connecting
> > to remote task manager + '/10.45.128.4:35285' has failed. This might
> > indicate that the remote task manager has been lost.
> >  at org.apache.flink.runtime.io
> >
> .network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
> >  at org.apache.flink.runtime.io
> >
> .network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
> >  at org.apache.flink.runtime.io
> >
> .network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:86)
> >  at org.apache.flink.runtime.io
> >
> .network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:67)
> >  at org.apache.flink.runtime.io
> >
> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
> >  ... 7 more
> > Caused by: org.apache.flink.runtime.io
> .network.netty.exception.RemoteTransportException:
> > Connecting to remote task manager + '/10.45.128.4:35285' has failed.
> This
> > might indicate that the remote task manager has been lost.
> >  at org.apache.flink.runtime.io
> >
> .network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
> >  at org.apache.flink.runtime.io
> >
> .network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
> >  at
> >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
> >  at
> >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:493)
> >  at
> >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:472)
> >  at
> >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
> >  at
> >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
> >  at
> >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:531)
> >  at
> >
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111)
> >  at
> >
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:323)
> >  at
> >
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:339)
> >  at
> >
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)
> >  at
> >
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
> >  at
> >
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
> >  at
> >
> 

回复:flink如何正则读取hdfs下的文件

2020-05-20 Thread jimandlice
flink 写入hive 使用api 思路是怎么的呢


| |
jimandlice
|
|
邮箱:jimandl...@163.com
|

Signature is customized by Netease Mail Master

在2020年05月21日 10:57,阿华田 写道:
flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



flink如何正则读取hdfs下的文件

2020-05-20 Thread 阿华田
flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



Re: Stream Iterative Matching

2020-05-20 Thread Guowei Ma
Hi, Marc

I think the window operator might fulfill your needs. You could find the
detailed description here[1]
In general, I think you could choose the correct type of window and use the
`ProcessWindowFunction` to emit the elements that match the best sum.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/windows.html

Best,
Guowei


ba  于2020年5月20日周三 下午9:58写道:

> Hi All,
>
> I'm new to Flink but am trying to write an application that processes data
> from internet connected sensors.
>
> My problem is as follows:
>
> -Data arrives in the format: [sensor id] [timestamp in seconds] [sensor
> value]
> -Data can arrive out of order (between sensor IDs) by upto 5 minutes.
> -So a stream of data could be:
> [1] [100] [20]
> [2] [101] [23]
> [1] [105] [31]
> [1] [140] [17]
>
> -Each sensor can sometimes split its measurements, and I'm hoping to 'put
> them back together' within Flink. For data to be 'put back together' it
> must
> have a timestamp within 90 seconds of the timestamp on the first piece of
> data. The data must also be put back together in order, in the example
> above
> for sensor 1 you could only have combinations of (A) the first reading on
> its own (time 100), (B) the first and third item (time 100 and 105) or (C)
> the first, third and fourth item (time 100, 105, 140). The second item is a
> different sensor so not considered in this exercise.
>
> -I would like to write something that tries different 'sum' combinations
> within the 90 second limit and outputs the best 'match' to expected values.
> In the example above lets say the expected sum values are 50 or 100. Of the
> three combinations I mentioned for sensor 1, the sum would be 20, 51, or
> 68.
> Therefore the 'best' combination is 51 as it is closest to 50 or 100, so it
> would output two data items: [1] [100] [20] and [1] [105] [31], with the
> last item left in the stream and matched with any other data points that
> arrive after.
>
> I am thinking some sort of iterative function that does this, but am not
> sure how to emit the values I want and keep other values that were
> considered (but not used) in the stream.
>
> Any ideas or help is really appreciated?
>
> Thanks,
> Marc
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: 这种复杂数据直接解析成null了

2020-05-20 Thread Leonard Xu
+ 补个文档链接[1], 以及可能遇到一个潜在问题的issue链接:

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/types.html#%E7%BB%93%E6%9E%84%E5%8C%96%E7%9A%84%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B
 

[2] https://issues.apache.org/jira/browse/FLINK-17847 


> 在 2020年5月21日,00:01,Leonard Xu  写道:
> 
> Hi, 
>> 语句:
>> CREATE TABLE A (
>> w_data  STRING,
>> w_table  STRING,
>> w_ts TIMESTAMP(3)
> 
> 如果需要 w_data 字段是一个json数组,需要声明表时声明对应的结构化数据类型[1]
> 
> 即你这里的A表需要声明成:
> create table json_table(
>   w_es BIGINT, 
>   w_type STRING, 
>   w_isDdl BOOLEAN, 
>   w_data ARRAY account_pay_fee DOUBLE>>, 
>   w_ts TIMESTAMP(3), 
>   w_table STRING) WITH (
>  'connector.type' = 'kafka',
>  'connector.version' = '0.11',
>  'connector.topic' = 'json-test',
>  'connector.properties.zookeeper.connect' = 'localhost:2181',
>  'connector.properties.bootstrap.servers' = 'localhost:9092',
>  'connector.properties.group.id' = 'test-jdbc',
>  'connector.startup-mode' = 'earliest-offset',
>  'format.type' = 'json',
>  'format.derive-schema' = 'true'
> )
> 
> 
> Best,
> Leonard Xu
> 
>> 在 2020年5月20日,18:06,guaishushu1...@163.com 写道:
>> 
>> 语句:
>> CREATE TABLE A (
>> w_data  STRING,
>> w_table  STRING,
>> w_ts TIMESTAMP(3)
>> 
>> 
>> CREATE TABLE B (
>> w_ts TIMESTAMP(3),
>> city1_id  STRING,
>> cate3_id  STRING,
>> pay_order_id  STRING
>> )
>> 
>> insert into B
>> select w_ts,
>> 
>> 'test' as city1_id,
>> 
>> ArrayIndexOf(w_data, 0) AS cate3_id,
>> w_data as pay_order_id
>> from A
>> 
>> 部分数据
>> A
>> {"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82=89.0=0=0=02=4=89.0=success=32590183789575=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"111"}
>> 
>> B
>> {"w_ts":"2020-05-20T13:58:37.131Z","city1_id":"test","cate3_id":null,"pay_order_id":""}
>> 
>> 
>> 
>> guaishushu1...@163.com
>> 
>> 发件人: Leonard Xu
>> 发送时间: 2020-05-20 16:03
>> 收件人: user-zh
>> 主题: Re: Flink 1.10-SQL解析复杂json问题
>> Hi, guaishushu
>> 贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗?
>> 用个单元测试应该就可以复现问题
>> 
>> Best,
>> Leonard
>> 
>> [1] 
>> https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
>>  
>> 
>> 
>>> 在 2020年5月20日,15:51,guaishushu1...@163.com  
>>> 写道:
>>> 
>>> kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> guaishushu1...@163.com 
> 



Re: flink1.10.x 解析 arrar 问题

2020-05-20 Thread Jingsong Li
谢谢Benchao的回答。

虽然可以work around,但是这看起来应该是blink planner要去支持的事情。
我建个JIRA去跟踪下:https://issues.apache.org/jira/browse/FLINK-17855

Best,
Jingsong Lee

On Wed, May 20, 2020 at 8:02 PM 了不起的盖茨比 <573693...@qq.com> wrote:

> 谢谢大佬,终于弄好了。谢谢。
> public TypeInformation return new
> RowTypeInfo(Types.OBJECT_ARRAY(Types.ROW(Types.STRING,Types.STRING))).getFieldTypes();
> }
>
>
>
>
> --原始邮件--
> 发件人:"Benchao Li" 发送时间:2020年5月20日(星期三) 晚上7:39
> 收件人:"user-zh"
> 主题:Re: flink1.10.x 解析 arrar
>
>
> 不是这个意思,你需要覆盖`ScalarFunction#getParameterTypes(Class signature)`这个方法,显示指定你的输入数据的类型
> 比如你说的是Row[],那你需要指定Types.OBJECT_ARRAY(Types.ROW(Types.INT,
> Types.STRING...)),Row里面的类型需要填写
> 你真实的类型。
>
> 了不起的盖茨比 <573693...@qq.com 于2020年5月20日周三 下午7:24写道:
>
>  udf的指定参数类型是 org.apache.flink.types.Row[],也是定义了这个。
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Benchao Li"  发送时间:nbsp;2020年5月20日(星期三) 晚上6:51
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flink1.10.x 解析 arrar 
> 
> 
>  你可以让你的UDF指定一下输入参数的类型,不指定类型的话,可能会推导出错误的类型,导致拿不到真正的数据
> 
>  了不起的盖茨比 <573693...@qq.comgt; 于2020年5月20日周三 下午4:25写道:
> 
>  gt; 1.blink_planner 定义ddl解析array字段时候,假如select 那个字段可以解析出。
>  gt;nbsp; 2.blink_planner 当我去定义自己函数时候,数组有长度但是没有元素,flink直接跳过解析
>  gt;
>  gt;
>  gt; 3.如果使用flink-planner那么是正确的
>  gt;
>  gt;
>  gt;
>  gt; CREATE TABLE sourceTable (
>  gt;
>  gt; amp;nbsp;event_time_line array  gt;
>  gt; amp;nbsp; `rule_name` VARCHAR,
>  gt;
>  gt; amp;nbsp; `count` VARCHAR
>  gt;
>  gt; amp;nbsp;)amp;gt;
>  gt;
>  gt; ) WITH (
>  gt;
>  gt; amp;nbsp;'connector.type' = 'kafka',
>  gt;
>  gt; amp;nbsp;'connector.version' = 'universal',
>  gt;
>  gt; amp;nbsp;'connector.startup-mode' = 'earliest-offset',
>  gt;
>  gt; amp;nbsp;'connector.topic' = 'topic_test_1',
>  gt;
>  gt; amp;nbsp;'connector.properties.zookeeper.connect' =
> 'localhost:2181',
>  gt;
>  gt; amp;nbsp;'connector.properties.bootstrap.servers' =
> 'localhost:9092',
>  gt;
>  gt; amp;nbsp;'update-mode' = 'append',
>  gt;
>  gt; amp;nbsp;'format.type' = 'json',
>  gt;
>  gt; amp;nbsp;'format.derive-schema' = 'true'
>  gt;
>  gt; );
>  gt;
>  gt; --可以查出数据
>  gt;
>  gt; select event_time_line from sourceTable ;
>  gt;
>  gt; --当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了
>  gt;
>  gt; select type_change(event_time_line) from sourceTable ;
>  gt;
>  gt; amp;nbsp;
>  gt;
>  gt; public class TypeChange extends ScalarFunction {
>  gt;
>  gt; amp;nbsp;amp;nbsp;amp;nbsp; /**
>  gt;
>  gt; amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; *
>  gt; 为null,但是数组有长度,数组里面的字段没有被识别出来,换了默认的planner,可以正常得到值
>  gt;
>  gt; amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; *
> @param rows
>  gt;
>  gt; amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; *
> @return
>  gt;
>  gt; amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; */
>  gt;
>  gt; amp;nbsp;amp;nbsp;amp;nbsp; public String
> eval(Row [] rows){
>  gt;
>  gt;
> amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
>  amp;nbsp;return
>  gt; JSONObject.toJSONString(rows);
>  gt;
>  gt; amp;nbsp;amp;nbsp;amp;nbsp; }
>  gt;
>  gt; amp;nbsp;
>  gt;
>  gt; }
> 
> 
> 
>  --
> 
>  Benchao Li
>  School of Electronics Engineering and Computer Science, Peking
> University
>  Tel:+86-15650713730
>  Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn



-- 
Best, Jingsong Lee


Re: 数组越界

2020-05-20 Thread Leonard Xu
Hi,  allanqinjy

方便贴下查询的query吗?今天在排查另外一个问题时也遇到了这个问题,我建了issue来跟踪[1],想看下是不是相同原因。


Best,
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-17847 


> 在 2020年5月18日,19:52,Leonard Xu  写道:
> 
> Hi, allanqinjy
> 
> 运行时抛ArrayIndexOutOfBoundsException 是不符合预期的,感觉是个bug。
> 可以复现的haul,方便提供下复现的sql和数据吗?
> 
> Best,
> Leonard Xu
> 
> 
>> 在 2020年5月18日,17:37,Benchao Li  写道:
>> 
>> 数组长度是运行时的问题,编译期并不知道数组的长度。而且现在好像也没有检查下标是不是合法(比如必须大于0)。我们以前也经常遇到这种问题。
>> 
>> allanqinjy  于2020年5月18日周一 下午5:15写道:
>> 
>>> 我觉得要是从1开始,那么编译的时候就应该报异常了,而不是提交作业运行报。
>>> 
>>> 
>>> 
>>> 
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 22369621
>>> 18-05-2020 16:27:14 CST INFO -  at
>>> org.apache.flink.table.runtime.util.SegmentsUtil.getByteMultiSegments(SegmentsUtil.java:598)
>>> 18-05-2020 16:27:14 CST INFO -  at
>>> org.apache.flink.table.runtime.util.SegmentsUtil.getByte(SegmentsUtil.java:590)
>>> 18-05-2020 16:27:14 CST INFO -  at
>>> org.apache.flink.table.runtime.util.SegmentsUtil.bitGet(SegmentsUtil.java:534)
>>> 18-05-2020 16:27:14 CST INFO -  at
>>> org.apache.flink.table.dataformat.BinaryArray.isNullAt(BinaryArray.java:117)
>>> 18-05-2020 16:27:14 CST INFO -  at BatchCalc$822.processElement(Unknown
>>> Source)
>>> 18-05-2020 16:27:14 CST INFO -  at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
>>> 18-05-2020 16:27:14 CST INFO -  at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
>>> 18-05-2020 16:27:14 CST INFO -  at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
>>> 18-05-2020 16:27:14 CST INFO -  at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:748)
>>> 18-05-2020 16:27:14 CST INFO -  at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:734)
>>> 18-05-2020 16:27:14 CST INFO -  at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>>> 18-05-2020 16:27:14 CST INFO -  at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>>> 18-05-2020 16:27:14 CST INFO -  at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>> 18-05-2020 16:27:14 CST INFO -  at
>>> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
>>> 18-05-2020 16:27:14 CST INFO -  at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>> 18-05-2020 16:27:14 CST INFO -  at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-05-18 16:38:16,"1048262223" <1048262...@qq.com> 写道:
 图看不到
 flink内置udf和hive udf不同,有些udf下标是从1开始的
 
 
 
 
 
 
 
 
 各位好,
  
>>> flink1.10,在跑flink批量sql的适合语法通过没问题,在运行脚步的适合报错如下,hive脚步跑没有问题,不知道为什么flink
>>> 跑会报数组越界,这个是什么问题?
   
 
 
 
 
 
>>> 
>> 
>> 
>> -- 
>> 
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
> 



Re: 这种复杂数据直接解析成null了

2020-05-20 Thread Leonard Xu
Hi, 
> 语句:
> CREATE TABLE A (
> w_data  STRING,
> w_table  STRING,
> w_ts TIMESTAMP(3)

如果需要 w_data 字段是一个json数组,需要声明表时声明对应的结构化数据类型[1]

即你这里的A表需要声明成:
create table json_table(
w_es BIGINT, 
w_type STRING, 
w_isDdl BOOLEAN, 
w_data ARRAY>, 
w_ts TIMESTAMP(3), 
w_table STRING) WITH (
  'connector.type' = 'kafka',
  'connector.version' = '0.11',
  'connector.topic' = 'json-test',
  'connector.properties.zookeeper.connect' = 'localhost:2181',
  'connector.properties.bootstrap.servers' = 'localhost:9092',
  'connector.properties.group.id' = 'test-jdbc',
  'connector.startup-mode' = 'earliest-offset',
  'format.type' = 'json',
  'format.derive-schema' = 'true'
)


Best,
Leonard Xu

> 在 2020年5月20日,18:06,guaishushu1...@163.com 写道:
> 
> 语句:
> CREATE TABLE A (
> w_data  STRING,
> w_table  STRING,
> w_ts TIMESTAMP(3)
> 
> 
> CREATE TABLE B (
> w_ts TIMESTAMP(3),
> city1_id  STRING,
> cate3_id  STRING,
> pay_order_id  STRING
> )
> 
> insert into B
> select w_ts,
> 
> 'test' as city1_id,
> 
> ArrayIndexOf(w_data, 0) AS cate3_id,
> w_data as pay_order_id
> from A
> 
> 部分数据
> A
> {"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82=89.0=0=0=02=4=89.0=success=32590183789575=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"111"}
> 
> B
> {"w_ts":"2020-05-20T13:58:37.131Z","city1_id":"test","cate3_id":null,"pay_order_id":""}
> 
> 
> 
> guaishushu1...@163.com
> 
> 发件人: Leonard Xu
> 发送时间: 2020-05-20 16:03
> 收件人: user-zh
> 主题: Re: Flink 1.10-SQL解析复杂json问题
> Hi, guaishushu
> 贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗?
> 用个单元测试应该就可以复现问题
> 
> Best,
> Leonard
> 
> [1] 
> https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
>  
> 
> 
>> 在 2020年5月20日,15:51,guaishushu1...@163.com  写道:
>> 
>> kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> guaishushu1...@163.com 



Re: Flink Dashboard UI Tasks hard limit

2020-05-20 Thread Vijay Balakrishnan
Hi,
I have increased the number of slots available but the Job is not using all
the slots but runs into this approximate 18000 Tasks limit. Looking into
the source code, it seems to be opening file -
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
So, do I have to tune the ulimit or something similar at the Ubuntu O/S
level to increase number of tasks available ? What I am confused about is
the ulimit is per machine but the ExecutionGraph is across many machines ?
Please pardon my ignorance here. Does number of tasks equate to number of
open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has
16 vCPUs.

TIA.

On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan 
wrote:

> Hi,
>
> Flink Dashboard UI seems to show tasks having a hard limit for Tasks
> column around 18000 on a Ubuntu Linux box.
> I kept increasing the number of slots per task manager to 15 and number of
> slots increased to 705 but the slots to tasks
> stayed at around 18000. Below 18000 tasks, the Flink Job is able to start
> up.
> Even though I increased the number of slots, it still works when 312 slots
> are being used.
>
> taskmanager.numberOfTaskSlots: 15
>
> What knob can I tune to increase the number of Tasks ?
>
> Pls find attached the Flink Dashboard UI.
>
> TIA,
>
>


Re: How do I get the IP of the master and slave files programmatically in Flink?

2020-05-20 Thread Alexander Fedulov
Hi Felippe,

could you clarify in some more details what you are trying to achieve?

Best regards,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng



On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi all,
>
> I have my own operator that extends the AbstractUdfStreamOperator
> class and I want to issue some messages to it. Sometimes the operator
> instances are deployed on different TaskManagers and I would like to
> set some attributes like the master and slave IPs on it.
>
> I am trying to use these values but they only return localhost, not
> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
> 192.168.56.1).
>
> ConfigOption restAddressOption = ConfigOptions
>.key("rest.address")
>.stringType()
>.noDefaultValue();
> System.out.println("DefaultJobManagerRunnerFactory rest.address: " +
> jobMasterConfiguration.getConfiguration().getValue(restAddressOption));
> System.out.println("rpcService: " + rpcService.getAddress());
>
>
> Thanks,
> Felipe
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>


Flink Weekly | 每周社区动态更新 - 2020/05/20

2020-05-20 Thread 王雷
Flink 开发进展

1.Release

■ Piotr Nowojski 宣布 release-1.11 分支冻结。
[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNECE-release-1-11-branch-cut-td41668.html

■ 1.10.1 已成功发版,发版日志见下链接。
[2]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891

■ 1.10.1 发版后,Seth Wiesman 发现 FLINK-16684 修改了 StreamingFileSink
(@PublicEvolving) 的 API,导致 1.10.0 和 1.10.1 之间存在二进制不兼容问题。
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-Breaking-API-Change-in-1-10-1-td41377.html

2.Dev

■ 当用户使用 per-job 模式提交任务时,当前的 History Server 无法聚合的显示这些任务。Gyula 对 History
Server 进行了修改,实现了一个可以聚合不同集群任务的看板。
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Rework-History-Server-into-Global-Dashboard-td41393.html

3.FLIP

■ [Runtime] Aljoscha Krettek 宣布 FLIP-126 投票通过,FLIP-126 旨在对 Watermark
Assigners 进行重构。
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-126-FLIP-126-Unify-and-separate-Watermark-Assigners-td41349.html

4.Discuss

■ [Config] Stephan Ewen 发起了将 state.backend.fs.memory-threshold 的默认值从 1K 提升到
100K 的讨论,目的是减少小文件。大家对该改动可能导致 state 变大,从而导致 OOM 的问题进行了讨论。
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-increase-quot-state-backend-fs-memory-threshold-quot-from-1K-to-100K-td41475.html

■ [Develop] 关于 @PublicEvolving 注解的 class 需要在相同的 minor 版本修复 bug 时提供 API 和
二进制兼容的投票正在进行。
[7]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Guarantee-that-PublicEvolving-classes-are-API-and-binary-compatible-across-bug-fix-releases-x-y-td41543.html

■ [Doc] 关于构建一个 Flink 学习资料平台的讨论正在进行,详情见邮件。
[8]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-a-material-web-page-under-quot-https-flink-apache-org-quot-td41298.html

■ [Doc] 当前的发布流程会出现 dist.apache.org 中的下载链接不稳定的问题,Chesnay Schepler
修改了发布指南,只有在完成发布后,才可以从 dist.apache.org 中移除老版本。
[9]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/NOTICE-Deletion-of-older-releases-from-dist-apache-org-td41422.html

5.Other

■ [Security] Chesnay Schepler 发布了 CVE-2020-1960
安全漏洞。攻击者可借助特制请求利用该漏洞进行中间人攻击,入侵通过JMX与进程建立的连接,获取传递的数据。以下版本受到影响:1.1.0 to
1.1.5,1.2.0 to 1.2.1,1.3.0 to 1.3.3,1.4.0 to 1.4.2,1.5.0 to 1.5.6,1.6.0 to
1.6.4,1.7.0 to 1.7.2,1.8.0 to 1.8.3,1.9.0 to 1.9.2,1.10.0。修复方案见邮件。
[10]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/CVE-2020-1960-Apache-Flink-JMX-information-disclosure-vulnerability-td41437.html

■ [CI] 当前对于 PR 端到端的测试任务使用的资源达到了 Flink 的 Azure Pipelines 账号的上限,Robert
Metzger 不得不手动取消掉一些端到端的测试。Robert Metzger 正在积极寻找解决办法。
[11]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/NOTICE-Azure-Pipelines-Status-td41455.html

邮件问题答疑

■ [Debug] 李佳宸遇到了在集群关闭时,Pushgateway 仍然存有metrics数据的问题,杨纲给予了解答,当通过 yarn kill
的方式停掉任务时,Pushgateway 内存中缓存的指标不会被清理。
[12]
http://apache-flink.147419.n8.nabble.com/Prometheus-Pushgateway-Flink-td3041.html

■ [Runtime] 1101300123遇到了 AppendOnlyTopNFunction 报数组越界的问题,云邪确认是一个
bug,创建了issue FLINK-17625。
[13]
http://apache-flink.147419.n8.nabble.com/1-10-flinkSQL-row-number-top1-td3056.html

■ [Debug] Jeff 希望能够监控 task 级别的内存使用情况,由于运行在同一个
JVM,进程中的不同线程的内存开销分析代价会比较高,不适合实时计算场景,所以不支持 task 级别的内存监控。
[14]http://apache-flink.147419.n8.nabble.com/task-td3075.html

■ [Stateful Function] Annemarie Burger 希望能在 Stateful Function 中使用窗口功能。Igal
Shilman 告知当前 Stateful Function 不支持窗口功能,同时提供了 DelayedMessage 的变通方法。
[15]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-processing-in-Stateful-Functions-td34966.html

■ [Debug] Jacky Du 在 AWS EMR Debug 使用 JITWatch 报错,Xintong Song 指出,当在
flink-conf.yaml 配置 JITWatch 参数时不能使用引号。
[16]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Memory-analyze-on-AWS-EMR-td35036.html

■ [Runtime] Ken Krugler 希望自定义重启策略,Zhu Zhu 告知自 1.10 版本开始,RestartStrategy 被
RestartBackoffTimeStrategy 替代,RestartBackoffTimeStrategy 暂不支持自定义重启策略。
[17]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-restart-strategy-on-specific-exception-td24665.html

活动 / 博客文章 / 其他

■ Flink Forward Global 2020 暂定于10月19日 - 21日在线上举行,目前正在征集议题。
[18]https://www.flink-forward.org/global-2020/call-for-presentations

■ Apache Flink 杭州站线上 Meetup 圆满结束,直播回放如下:
[19]https://developer.aliyun.com/live/2772

■ Flink 1.10 细粒度资源管理解析
[20]https://mp.weixin.qq.com/s/NZXtKlRNnWdWDNtU7cml2Q

■ Flink 与 Hive 的磨合期
[21]https://mp.weixin.qq.com/s/TH3TXKebXJ0nAKUh8wfxUw

■ 如何用一套引擎搞定机器学习全流程?
[22]https://mp.weixin.qq.com/s/c5bZy_v15FtT1oJGW0UAWQ


Stream Iterative Matching

2020-05-20 Thread ba
Hi All,

I'm new to Flink but am trying to write an application that processes data
from internet connected sensors.

My problem is as follows:

-Data arrives in the format: [sensor id] [timestamp in seconds] [sensor
value]
-Data can arrive out of order (between sensor IDs) by upto 5 minutes.
-So a stream of data could be:
[1] [100] [20]
[2] [101] [23]
[1] [105] [31]
[1] [140] [17]

-Each sensor can sometimes split its measurements, and I'm hoping to 'put
them back together' within Flink. For data to be 'put back together' it must
have a timestamp within 90 seconds of the timestamp on the first piece of
data. The data must also be put back together in order, in the example above
for sensor 1 you could only have combinations of (A) the first reading on
its own (time 100), (B) the first and third item (time 100 and 105) or (C)
the first, third and fourth item (time 100, 105, 140). The second item is a
different sensor so not considered in this exercise.

-I would like to write something that tries different 'sum' combinations
within the 90 second limit and outputs the best 'match' to expected values.
In the example above lets say the expected sum values are 50 or 100. Of the
three combinations I mentioned for sensor 1, the sum would be 20, 51, or 68.
Therefore the 'best' combination is 51 as it is closest to 50 or 100, so it
would output two data items: [1] [100] [20] and [1] [105] [31], with the
last item left in the stream and matched with any other data points that
arrive after.

I am thinking some sort of iterative function that does this, but am not
sure how to emit the values I want and keep other values that were
considered (but not used) in the stream.

Any ideas or help is really appreciated?

Thanks,
Marc



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question about My Flink Application

2020-05-20 Thread Alexander Fedulov
Hi Sara,

do you have logs? Any exceptions in them?

Best,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng



On Tue, May 19, 2020 at 11:28 PM Sara Arshad 
wrote:

> Hi,
>
> I have been using Flink with kinesis analytics.
> I have a stream of data and also I need a cache which I update every 300
> seconds.
> To share the cache data with the kinesis stream elements, I used a
> broadcast stream as I implemented a SourceFunction which gets the data from
> DB and broadcast it to the next stream which is
> KeyedBroadcastProcessFuction.
> But after adding the broadcast stream (in the previous version I hadn't
> a cache and I was using KeyedProcessFuction for kinesis stream), when I
> execute it in kinesis analytics, it keeps restarting about every 20
> minutes.
> Could you please help me that what could be the issue?
>
> Best regards,
> Sara Arshad
>


Re: Testing process functions

2020-05-20 Thread Alexander Fedulov
Hi Manas,

I would recommend using TestHarnesses for testing. You could also use them
prior to 1.10. Here is an example of setting the dependencies:
https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/build.gradle#L113

You can see some examples of tests for a demo application here:
https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/src/test/java/com/ververica/field/dynamicrules/RulesEvaluatorTest.java
Hope this helps.

Best regards,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng



On Mon, May 18, 2020 at 1:18 PM Manas Kale  wrote:

> I see, I had not considered the serialization; that was the issue.
> Thank you.
>
> On Mon, May 18, 2020 at 12:29 PM Chesnay Schepler 
> wrote:
>
>> We don't publish sources for test classes.
>>
>> Have you considered that the sink will be serialized on job submission,
>> meaning that your myTestSink instance is not the one actually used by
>> the job? This typically means that have to store stuff in a static field
>> instead.
>> Alternatively, depending on the number of elements
>> org.apache.flink.streaming.api.datastream.DataStreamUtils#collect might
>> be worth a try.
>>
>> On 15/05/2020 12:49, Manas Kale wrote:
>> > Hi,
>> > How do I test process functions? I tried by implementing a sink
>> > function that stores myProcessFunction's output in a list. After
>> > env.execute(), I use assertions.
>> > If I set a breakpoint in the myTestSink's invoke() method, I see that
>> > that method is being called correctly. However, after env.execute()
>> > returns, all data in sink functions is wiped clean.
>> >
>> > TestSink myTestSink = new myTestSink();
>> > testStream.process(new myProcessFunction()).addSink(myTestSink);
>> > env.execute("test");
>> > assertEquals(expectedOutput, myTestSink.actual);
>> >
>> > What am I doing wrong?
>> >  Also, I see that a ProcessFunctionTestHarnesses has been added in
>> > 1.10. I wasn't able to download its sources to understand how I could
>> > use that. Have the sources not been added to maven or is it a problem
>> > at my end?
>> >
>> > Regards,
>> > Manas
>>
>>
>>


Re: [EXTERNAL] Re: Memory growth from TimeWindows

2020-05-20 Thread Slotterback, Chris
What I've noticed is that heap memory ends up growing linearly with time 
indefinitely (past 24 hours) until it hits the roof of the allocated heap for 
the task manager, which leads me to believe I am leaking somewhere. All of my 
windows have an allowed lateness of 5 minutes, and my watermarks are pulled 
from time embedded in the records using 
BoundedOutOfOrdernessTimestampExtractors. My TumblingEventTimeWindows and 
SlidingEventTimeWindow all use AggregateFunctions, and my intervalJoins use 
ProcessJoinFunctions.

I expect this app to use a significant amount of memory at scale due to the 288 
5-minute intervals in 24 hours, and records being put in all 288 window states, 
and as the application runs for 24 hours memory would increase as all 
288(*unique key) windows build with incoming records, but then after 24 hours 
the memory should stop growing, or at least grow at a different rate?

Also of note, we are using a FsStateBackend configuration, and plan to move to 
RocksDBStateBackend, but from what I can tell, this would only reduce memory 
and delay hitting the heap memory capacity, not stall it forever?

Thanks
Chris


On 5/18/20, 7:29 AM, "Aljoscha Krettek"  wrote:

On 15.05.20 15:17, Slotterback, Chris wrote:
> My understanding is that while all these windows build their memory 
state, I can expect heap memory to grow for the 24 hour length of the 
SlidingEventTimeWindow, and then start to flatten as the t-24hr window frames 
expire and release back to the JVM. What is actually happening is when a 
constant data source feeds the stream, the heap memory profile grows linearly 
past the 24 hour mark. Could this be a result of a misunderstanding of how the 
window’s memory states are kept, or is my assumption correct, and it is more 
likely I have a leak somewhere?

Will memory keep growing indefinitely? That would indicate a bug? What
sort of lateness/watermark settings do you have? What window function do
you use? ProcessWindowFunction, or sth that aggregates?

Side note: with sliding windows of 24h/5min you will have a "write
amplification" of 24*60/5=288, each record will be in 288 windows, which
will each be kept in separate state?

Best,
Aljoscha




Re: Flink operator throttle

2020-05-20 Thread Alexander Fedulov
Hi Chen,

just a small comment regarding your proposition: this would work well when
one does a complete message passthrough. If there is some filtering in the
pipeline, which could be dependent on the incoming stream data itself, the
output throughput (the goal of the throttling) would be hard to control
precisely.

Best,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng



On Mon, May 18, 2020 at 7:55 AM Chen Qin  wrote:

> Hi Ray,
>
> In a bit abstract point of view, you can always throttle source and get
> proper sink throughput control.
> One approach might be just override base KafkaFetcher and use shaded
> guava rate limtier.
>
>
> https://github.com/apache/flink/blob/59714b9d6addb1dbf2171cab937a0e3fec52f2b1/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L347
>
> Best,
>
> Chen
>
>
> On Sat, May 16, 2020 at 11:48 PM Benchao Li  wrote:
>
>> Hi,
>>
>> > If I want to use the rate limiter in other connectors, such as Kafka
>> sink, ES sink, I need to do some more work on these connectors.
>> Yes, you can do this by changing Kafka/ES sink, actually, this is how we
>> did internally.
>>
>> > I'd like to know if the community has a plan to make a lower-level
>> implementation for all connectors, also for table API and SQL?
>> In my understanding, there is no on-going work on this. And usually we
>> should leverage the back-pressure feature to do this.
>> We can hear more from others whether this is a valid need.
>>
>> 王雷  于2020年5月17日周日 下午2:32写道:
>>
>>> Hi Benchao
>>>
>>> Thanks for your answer!
>>>
>>> According to your answer, I found `GuavaFlinkConnectorRateLimiter` which
>>> is the implementation of the `FlinkConnectorRateLimiter`.
>>>
>>> If I want to use the rate limiter in other connectors, such as Kafka
>>> sink, ES sink, I need to do some more work on these connectors.
>>>
>>> I'd like to know if the community has a plan to make a lower-level
>>> implementation for all connectors, also for table API and SQL?
>>>
>>> Thanks
>>> Ray
>>>
>>> Benchao Li  于2020年5月14日周四 下午5:49写道:
>>>
 AFAIK, `FlinkKafkaConsumer010#setRateLimiter` can configure the kafka
 source to have a rate limiter.
 (I assume you uses Kafka)
 However it only exists in Kafka 0.10 DataStream Connector, not in other
 versions nor table api.

 王雷  于2020年5月14日周四 下午5:31写道:

> hi, All
>
> Does Flink support rate limitation?
> How to limit the rate when the external database connected by the sink
> operator has throughput limitation.
> Instead of passive back pressure after reaching the limit of the
> external database, we want to limit rate actively.
>
> Thanks
> Ray
>


 --

 Benchao Li
 School of Electronics Engineering and Computer Science, Peking University
 Tel:+86-15650713730
 Email: libenc...@gmail.com; libenc...@pku.edu.cn


>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>


?????? flink1.10.x ???? arrar ????

2020-05-20 Thread ??????????????

public TypeInformation

Re: flink1.10.x 解析 arrar 问题

2020-05-20 Thread Benchao Li
不是这个意思,你需要覆盖`ScalarFunction#getParameterTypes(Class[]
signature)`这个方法,显示指定你的输入数据的类型
比如你说的是Row[],那你需要指定Types.OBJECT_ARRAY(Types.ROW(Types.INT,
Types.STRING...)),Row里面的类型需要填写
你真实的类型。

了不起的盖茨比 <573693...@qq.com> 于2020年5月20日周三 下午7:24写道:

> udf的指定参数类型是 org.apache.flink.types.Row[],也是定义了这个。
>
>
> --原始邮件--
> 发件人:"Benchao Li" 发送时间:2020年5月20日(星期三) 晚上6:51
> 收件人:"user-zh"
> 主题:Re: flink1.10.x 解析 arrar
>
>
> 你可以让你的UDF指定一下输入参数的类型,不指定类型的话,可能会推导出错误的类型,导致拿不到真正的数据
>
> 了不起的盖茨比 <573693...@qq.com 于2020年5月20日周三 下午4:25写道:
>
>  1.blink_planner 定义ddl解析array字段时候,假如select 那个字段可以解析出。
>  2.blink_planner 当我去定义自己函数时候,数组有长度但是没有元素,flink直接跳过解析
> 
> 
>  3.如果使用flink-planner那么是正确的
> 
> 
> 
>  CREATE TABLE sourceTable (
> 
>  nbsp;event_time_line array 
>  nbsp; `rule_name` VARCHAR,
> 
>  nbsp; `count` VARCHAR
> 
>  nbsp;)gt;
> 
>  ) WITH (
> 
>  nbsp;'connector.type' = 'kafka',
> 
>  nbsp;'connector.version' = 'universal',
> 
>  nbsp;'connector.startup-mode' = 'earliest-offset',
> 
>  nbsp;'connector.topic' = 'topic_test_1',
> 
>  nbsp;'connector.properties.zookeeper.connect' = 'localhost:2181',
> 
>  nbsp;'connector.properties.bootstrap.servers' = 'localhost:9092',
> 
>  nbsp;'update-mode' = 'append',
> 
>  nbsp;'format.type' = 'json',
> 
>  nbsp;'format.derive-schema' = 'true'
> 
>  );
> 
>  --可以查出数据
> 
>  select event_time_line from sourceTable ;
> 
>  --当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了
> 
>  select type_change(event_time_line) from sourceTable ;
> 
>  nbsp;
> 
>  public class TypeChange extends ScalarFunction {
> 
>  nbsp;nbsp;nbsp; /**
> 
>  nbsp;nbsp;nbsp;nbsp; *
>  为null,但是数组有长度,数组里面的字段没有被识别出来,换了默认的planner,可以正常得到值
> 
>  nbsp;nbsp;nbsp;nbsp; * @param rows
> 
>  nbsp;nbsp;nbsp;nbsp; * @return
> 
>  nbsp;nbsp;nbsp;nbsp; */
> 
>  nbsp;nbsp;nbsp; public String eval(Row [] rows){
> 
>  nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> nbsp;return
>  JSONObject.toJSONString(rows);
> 
>  nbsp;nbsp;nbsp; }
> 
>  nbsp;
> 
>  }
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


?????? flink1.10.x ???? arrar ????

2020-05-20 Thread ??????????????
udf org.apache.flink.types.Row[]??


----
??:"Benchao Li"

How do I get the IP of the master and slave files programmatically in Flink?

2020-05-20 Thread Felipe Gutierrez
Hi all,

I have my own operator that extends the AbstractUdfStreamOperator
class and I want to issue some messages to it. Sometimes the operator
instances are deployed on different TaskManagers and I would like to
set some attributes like the master and slave IPs on it.

I am trying to use these values but they only return localhost, not
the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
192.168.56.1).

ConfigOption restAddressOption = ConfigOptions
   .key("rest.address")
   .stringType()
   .noDefaultValue();
System.out.println("DefaultJobManagerRunnerFactory rest.address: " +
jobMasterConfiguration.getConfiguration().getValue(restAddressOption));
System.out.println("rpcService: " + rpcService.getAddress());


Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com


Re: CoFlatMap has high back pressure

2020-05-20 Thread sundar
Thanks a lot for all the help!
I was able to figure out the bug just now. 
I had some extra code in the coFlatMap function(emitting stats) which was
inefficient and causing high GC usage. Fixing that fixed the issue. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink1.10.x 解析 arrar 问题

2020-05-20 Thread Benchao Li
你可以让你的UDF指定一下输入参数的类型,不指定类型的话,可能会推导出错误的类型,导致拿不到真正的数据

了不起的盖茨比 <573693...@qq.com> 于2020年5月20日周三 下午4:25写道:

> 1.blink_planner 定义ddl解析array字段时候,假如select 那个字段可以解析出。
>  2.blink_planner 当我去定义自己函数时候,数组有长度但是没有元素,flink直接跳过解析
>
>
> 3.如果使用flink-planner那么是正确的
>
>
>
> CREATE TABLE sourceTable (
>
> event_time_line array
>  `rule_name` VARCHAR,
>
>  `count` VARCHAR
>
> )
>
> ) WITH (
>
> 'connector.type' = 'kafka',
>
> 'connector.version' = 'universal',
>
> 'connector.startup-mode' = 'earliest-offset',
>
> 'connector.topic' = 'topic_test_1',
>
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
>
> 'connector.properties.bootstrap.servers' = 'localhost:9092',
>
> 'update-mode' = 'append',
>
> 'format.type' = 'json',
>
> 'format.derive-schema' = 'true'
>
> );
>
> --可以查出数据
>
> select event_time_line from sourceTable ;
>
> --当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了
>
> select type_change(event_time_line) from sourceTable ;
>
> 
>
> public class TypeChange extends ScalarFunction {
>
>  /**
>
>  *
> 为null,但是数组有长度,数组里面的字段没有被识别出来,换了默认的planner,可以正常得到值
>
>  * @param rows
>
>  * @return
>
>  */
>
>  public String eval(Row [] rows){
>
>  return
> JSONObject.toJSONString(rows);
>
>  }
>
> 
>
> }



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: 这种复杂数据直接解析成null了

2020-05-20 Thread Benchao Li
Flink里面对于Json的解析,是直接用的jackson,然后如果你声明的是varchar类型,会直接调用JsonNode.asText(),这个如果是container类型(也就是复杂类型)的话,是空字符串吧。

guaishushu1...@163.com  于2020年5月20日周三 下午6:06写道:

> 语句:
> CREATE TABLE A (
> w_data  STRING,
> w_table  STRING,
> w_ts TIMESTAMP(3)
>
>
> CREATE TABLE B (
> w_ts TIMESTAMP(3),
> city1_id  STRING,
> cate3_id  STRING,
> pay_order_id  STRING
> )
>
> insert into B
> select w_ts,
>
> 'test' as city1_id,
>
> ArrayIndexOf(w_data, 0) AS cate3_id,
> w_data as pay_order_id
> from A
>
> 部分数据
> A
>
> {"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82=89.0=0=0=02=4=89.0=success=32590183789575=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"111"}
>
> B
>
> {"w_ts":"2020-05-20T13:58:37.131Z","city1_id":"test","cate3_id":null,"pay_order_id":""}
>
>
>
> guaishushu1...@163.com
>
> 发件人: Leonard Xu
> 发送时间: 2020-05-20 16:03
> 收件人: user-zh
> 主题: Re: Flink 1.10-SQL解析复杂json问题
> Hi, guaishushu
> 贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗?
> 用个单元测试应该就可以复现问题
>
> Best,
> Leonard
>
> [1]
> https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
> <
> https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
> >
>
> > 在 2020年5月20日,15:51,guaishushu1...@163.com 
> 写道:
> >
> > kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。
> >
> >
> >
> >
> >
> >
> >
> > guaishushu1...@163.com 
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


回复:TM太多,作业运行失败问题

2020-05-20 Thread a511955993
hi,xintong

我这边观察到的现象,从系统日志上没有找到被内核oom 
kill的日志。作业cancel掉后,失联的tm会重连上来,pod没有被kill掉。初步怀疑是网络层面的问题,感觉是cni有什么限制。

thanks~




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年05月20日 17:56,Xintong Song 写道:
Hi,

从日志看,报错的根本原因是有 TM 挂掉了,导致 pod 被 remove,这样从其他 TM 上就找不到挂掉的 TM
的地址。你可以确认一下,发生错误的时候是否有 TM 挂掉/重启。

至于 TM 挂掉的原因,需要想办法获取到失败 TM
的日志。按照你之前的描述,集群启动的时候是没问题的,作业执行的时候才有问题。我现在怀疑的方向是,作业执行造成的资源问题使得 TM 发生了 OOM
或者是内存超用被 Kubernetes 杀掉了。你在修改 TM 数量、slot 数量的过程中,是否调整了 TM
的资源大小?另外即使没有调整,作业本身消耗的资源也会有所变化,例如 TM 数量变多导致每个 TM
需要建立更多的网络连接从而消耗的内存。具体还是需要根据日志分析。

Thank you~

Xintong Song



On Wed, May 20, 2020 at 4:50 PM  wrote:

> hi,xintong,堆栈信息如下。
>
> 2020-05-20 16:46:20
> org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException:
> Connection for partition
> 66c378b86c3e100e4a2d34927c4b7281@bb397f70ad4474d2beac18d484d726af not
> reachable.
>  at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>  at org.apache.flink.runtime.io
> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:240)
>  at org.apache.flink.runtime.io
> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:218)
>  at
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
>  at
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:864)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:624)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Connecting the channel failed: Connecting
> to remote task manager + '/10.45.128.4:35285' has failed. This might
> indicate that the remote task manager has been lost.
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:86)
>  at org.apache.flink.runtime.io
> .network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:67)
>  at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
>  ... 7 more
> Caused by: 
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connecting to remote task manager + '/10.45.128.4:35285' has failed. This
> might indicate that the remote task manager has been lost.
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:493)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:472)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:531)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:323)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:339)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  ... 1 more
> Caused by:
> 

这种复杂数据直接解析成null了

2020-05-20 Thread guaishushu1...@163.com
语句:
CREATE TABLE A (
w_data  STRING,
w_table  STRING,
w_ts TIMESTAMP(3)


CREATE TABLE B (
w_ts TIMESTAMP(3),
city1_id  STRING,
cate3_id  STRING,
pay_order_id  STRING
)

insert into B
select w_ts,

'test' as city1_id,

ArrayIndexOf(w_data, 0) AS cate3_id,
w_data as pay_order_id
from A

部分数据
A
{"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82=89.0=0=0=02=4=89.0=success=32590183789575=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"111"}

B
{"w_ts":"2020-05-20T13:58:37.131Z","city1_id":"test","cate3_id":null,"pay_order_id":""}



guaishushu1...@163.com
 
发件人: Leonard Xu
发送时间: 2020-05-20 16:03
收件人: user-zh
主题: Re: Flink 1.10-SQL解析复杂json问题
Hi, guaishushu
贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗?
用个单元测试应该就可以复现问题
 
Best,
Leonard
 
[1] 
https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
 

 
> 在 2020年5月20日,15:51,guaishushu1...@163.com  写道:
> 
> kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。
> 
> 
> 
> 
> 
> 
> 
> guaishushu1...@163.com 


Re: TM太多,作业运行失败问题

2020-05-20 Thread Xintong Song
Hi,

从日志看,报错的根本原因是有 TM 挂掉了,导致 pod 被 remove,这样从其他 TM 上就找不到挂掉的 TM
的地址。你可以确认一下,发生错误的时候是否有 TM 挂掉/重启。

至于 TM 挂掉的原因,需要想办法获取到失败 TM
的日志。按照你之前的描述,集群启动的时候是没问题的,作业执行的时候才有问题。我现在怀疑的方向是,作业执行造成的资源问题使得 TM 发生了 OOM
或者是内存超用被 Kubernetes 杀掉了。你在修改 TM 数量、slot 数量的过程中,是否调整了 TM
的资源大小?另外即使没有调整,作业本身消耗的资源也会有所变化,例如 TM 数量变多导致每个 TM
需要建立更多的网络连接从而消耗的内存。具体还是需要根据日志分析。

Thank you~

Xintong Song



On Wed, May 20, 2020 at 4:50 PM  wrote:

> hi,xintong,堆栈信息如下。
>
> 2020-05-20 16:46:20
> org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException:
> Connection for partition
> 66c378b86c3e100e4a2d34927c4b7281@bb397f70ad4474d2beac18d484d726af not
> reachable.
>  at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>  at org.apache.flink.runtime.io
> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:240)
>  at org.apache.flink.runtime.io
> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:218)
>  at
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
>  at
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:864)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:624)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Connecting the channel failed: Connecting
> to remote task manager + '/10.45.128.4:35285' has failed. This might
> indicate that the remote task manager has been lost.
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:86)
>  at org.apache.flink.runtime.io
> .network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:67)
>  at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
>  ... 7 more
> Caused by: 
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connecting to remote task manager + '/10.45.128.4:35285' has failed. This
> might indicate that the remote task manager has been lost.
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
>  at org.apache.flink.runtime.io
> .network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:493)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:472)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:531)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:323)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:339)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
>  at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>  at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  ... 1 more
> Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedNoRouteToHostException:
> No route to host: /10.45.128.4:35285
> Caused by: java.net.NoRouteToHostException: No route to host
>  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>  at 

回复:TM太多,作业运行失败问题

2020-05-20 Thread a511955993
hi,xintong,堆栈信息如下。

2020-05-20 16:46:20
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException:
 Connection for partition 
66c378b86c3e100e4a2d34927c4b7281@bb397f70ad4474d2beac18d484d726af not reachable.
 at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
 at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:240)
 at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:218)
 at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
 at 
org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:864)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:624)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connecting the channel failed: Connecting to 
remote task manager + '/10.45.128.4:35285' has failed. This might indicate that 
the remote task manager has been lost.
 at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
 at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
 at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:86)
 at 
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:67)
 at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
 ... 7 more
Caused by: 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connecting to remote task manager + '/10.45.128.4:35285' has failed. This might 
indicate that the remote task manager has been lost.
 at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
 at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:493)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:472)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:531)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:323)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:339)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
 at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 ... 1 more
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedNoRouteToHostException:
 No route to host: /10.45.128.4:35285
Caused by: java.net.NoRouteToHostException: No route to host
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:336)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)
 at 

[DISCUSS] Remove dependency shipping through nested jars during job submission.

2020-05-20 Thread Kostas Kloudas
Hi all,

I would like to bring the discussion in
https://issues.apache.org/jira/browse/FLINK-17745 to the dev mailing
list, just to hear the opinions of the community.

In a nutshell, in the early days of Flink, users could submit their
jobs as fat-jars that had a specific structure. More concretely, the
user could put the dependencies of the submitted job in a lib/ folder
within his/her jar and Flink would search within the user's jar for
such a folder, and if this existed, it would extract the nested jars,
ship them independently and add them to the classpath. Finally, it
would also ship the fat-jar itself so that the user-code is available
at the cluster (for details see [1]).

This way of submission was NOT documented anywhere and it has the
obvious shortcoming that the "nested" jars will be shipped twice. In
addition, it makes the codebase a bit more difficult to maintain, as
this constitutes another way of submitting stuff.

Given the above, I would like to propose to remove this codepath. But
given that there are users using the hidden feature, I would like to
discuss 1) how many such users exist, 2) how difficult it is for them
to "migrate" to a different way of submitting jobs, and 3) if the rest
of the community agrees on removing it.

I post this on both dev and user ML so that we have better coverage.

Looking forward to a fruitful discussion,
Kostas

[1] 
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java#L222


flink1.10.x ???? arrar ????

2020-05-20 Thread ??????????????
1.blink_planner ddlarray??select 
 2.blink_planner 
flink


3.flink-planner



CREATE TABLE sourceTable (
 
event_time_line array

Re: TM太多,作业运行失败问题

2020-05-20 Thread Xintong Song
hi

最好能把完整的日志以及 error stack 发出来。
这个报错通常是 TM 运行的机器/pod 之间网络不通造成的,有可能和 kubernetes 的配置有关,但就目前的信息比较难确定。

Thank you~

Xintong Song



On Wed, May 20, 2020 at 3:50 PM  wrote:

>
> hi, all
>
> 集群信息:
> flink版本是1.10.1,部署在kubernetes上。
>
> 现象:
> 需要200个slot,如果指定TM个数为40,每个TM的slot个数为4,可以正常运行作业。如果指定TM为200,每个TM的slot个数为1,集群可以正常构建,ui上Available
> Task Slots显示为200,提交作业的时候,就会出现如下报错:
>
> Cased by: java.net.NoRouteToHostException: No route to host.
>
> 目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,非常感谢。
>
> Looking forward to your reply and help.
>
> Best
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制


Flink 1.10-SQL解析复杂json问题

2020-05-20 Thread guaishushu1...@163.com
kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。

语句:
CREATE TABLE A (
w_data  STRING,
w_table  STRING,
w_ts TIMESTAMP(3)


CREATE TABLE B (
w_ts TIMESTAMP(3),
city1_id  STRING,
cate3_id  STRING,
pay_order_id  STRING
) 

insert into B
select w_ts,

'test' as city1_id,

ArrayIndexOf(w_data, 0) AS cate3_id,
w_data as pay_order_id
from A

部分数据
A
{"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82=89.0=0=0=02=4=89.0=success=32590183789575=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"111"}

B
{"w_ts":"2020-05-20T13:58:37.131Z","city1_id":"test","cate3_id":null,"pay_order_id":""}









guaishushu1...@163.com


sql client定义指向elasticsearch索引密码问题

2020-05-20 Thread naturalfree
在flink sql client配置文件中定义指向es的索引。发现没有设置用户名密码的属性,现在的es connector是否支持安全认证呢

| |
naturalfree
|
|
邮箱:naturalf...@126.com
|

签名由 网易邮箱大师 定制

sinktable更新部分字段问题

2020-05-20 Thread naturalfree
现在有一个es索引,想通过flink sql根据主键更新部分字段。不知是否有可行方案


| |
naturalfree
|
|
邮箱:naturalf...@126.com
|

签名由 网易邮箱大师 定制

Re: Flink 1.10-SQL解析复杂json问题

2020-05-20 Thread Leonard Xu
Hi, guaishushu
贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗?
用个单元测试应该就可以复现问题

Best,
Leonard

[1] 
https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
 


> 在 2020年5月20日,15:51,guaishushu1...@163.com  写道:
> 
> kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。
> 
> 
> 
> 
> 
> 
> 
> guaishushu1...@163.com 


Flink 1.10-SQL解析复杂json问题

2020-05-20 Thread guaishushu1...@163.com
kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。




guaishushu1...@163.com


TM太多,作业运行失败问题

2020-05-20 Thread a511955993

hi, all

集群信息:
flink版本是1.10.1,部署在kubernetes上。

现象:
需要200个slot,如果指定TM个数为40,每个TM的slot个数为4,可以正常运行作业。如果指定TM为200,每个TM的slot个数为1,集群可以正常构建,ui上Available
 Task Slots显示为200,提交作业的时候,就会出现如下报错:

Cased by: java.net.NoRouteToHostException: No route to host.

目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,非常感谢。

Looking forward to your reply and help.

Best

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

Re:Re: flink1.10怎么获得flink-shaded-hadoop包以支持hadoop3.2.1?

2020-05-20 Thread Jeff
好的,刚刚也查了其它资料,flink现在还不支持hadoop3,但用2.X的包也是可以的,只要不用到hadoop3特有API就行了

















在 2020-05-20 09:22:39,"刘大龙"  写道:
>Hi,
>你可以看一下这两个链接:
>1: https://www.mail-archive.com/dev@flink.apache.org/msg37293.html
>2: https://issues.apache.org/jira/browse/FLINK-11086
>> -原始邮件-
>> 发件人: Jeff 
>> 发送时间: 2020-05-20 10:09:10 (星期三)
>> 收件人: flink-zh 
>> 抄送: 
>> 主题: flink1.10怎么获得flink-shaded-hadoop包以支持hadoop3.2.1?
>> 
>> hi all,
>> 在mvnrepository里没找到支持hadoop3.2.1的flink-shaded-hadoop包, 
>> 在单独的flink-shaded项目里也没找到相应hadoop模块,请问我要怎么获得这个包呢?
>
>
>--
>刘大龙
>
>浙江大学 控制系 智能系统与控制研究所 工控新楼217
>地址:浙江省杭州市浙大路38号浙江大学玉泉校区
>Tel:18867547281


Re: Flink-1.10-SQL TopN语法问题

2020-05-20 Thread Leonard Xu
+ user-zh

> 在 2020年5月20日,15:27,Leonard Xu  写道:
> 
> Hi,guaishushu
> 
> 先说声抱歉邮件回复晚了,过了下你的sql,问题是1.10 中 对于upsertSink的primary 
> key是通过query来推断的,部分query是推断不出来的,你的query刚好推断
> 不出来PK的,所以会提示:Exception in thread "main" 
> org.apache.flink.table.api.TableException: UpsertStreamTableSink requires 
> that Table has a full primary keys if it is updated.
> 
> 在1.10的临时解决方法是加一层group by,使得query可以推断出 primary key:
> INSERT INTO test_mysql_2
>  SELECT order_id,rss, start_time FROM(
>> SELECT vid,rss, start_time FROM (
>>   SELECT  vid,rss, start_time,
>> ROW_NUMBER() OVER (PARTITION BY start_time ORDER BY rss desc) AS rownum
>>   FROM (
>> SELECT vid,
>>  DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '5' 
>> MINUTE),'-MM-dd HH:00') AS start_time,
>>  SUM(response_size) AS rss
>>  FROM user_log
>>  GROUP BY vid, TUMBLE(rowtime, INTERVAL '5' MINUTE)
>>)
>> )
>> WHERE rownum <= 10
> 
> 
> ) group by order_id,rss, start_time
> 
> 这确实是不完善的地方,即将到来的1.11中已经有了优雅的解决方案,可以在sink的表中声明primary key 而不是通过query推断PK[1].
> 
> 
> Best,
> Leonard
> [1] 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java#L184
>  
> 
> 
> 
>> 在 2020年5月11日,10:26,guaishushu1...@163.com  写道:
>> 
>> CREATE TABLE test_mysql_2 (
>>  vid string,
>>  rss BIGINT,
>>  start_time string
>> ) with ( 
>>  'connector.type' = 'jdbc',
>>  'connector.url' = 
>> 'jdbc:mysql://xxx/xxx?useUnicode=true=utf8=true=false'
>>  
>> ,
>>  'connector.username' = 'xxx',
>>  'connector.password' = 'xxx',
>>  'connector.table' = 'task_flink_table_3',
>>  'connector.write.flush.max-rows' = '100'
>> );
>> 
>> 
>> 
>> 
>> INSERT INTO test_mysql_2
>> SELECT vid,rss, start_time FROM (
>>   SELECT  vid,rss, start_time,
>> ROW_NUMBER() OVER (PARTITION BY start_time ORDER BY rss desc) AS rownum
>>   FROM (
>> SELECT vid,
>>  DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '5' 
>> MINUTE),'-MM-dd HH:00') AS start_time,
>>  SUM(response_size) AS rss
>>  FROM user_log
>>  GROUP BY vid, TUMBLE(rowtime, INTERVAL '5' MINUTE)
>>)
>> )
>> WHERE rownum <= 10;
>> 
>> guaishushu1...@163.com 
>>  
>> 发件人: Leonard Xu 
>> 发送时间: 2020-05-09 14:15
>> 收件人: user-zh 
>> 主题: Re: Flink-1.10-SQL TopN语法问题
>> Hi
>>  
>> 图挂了,可以用图床工具链接出来,或者可以把query贴出来看看嘛?
>>  
>> Best,
>> Leonard Xu
>>  
>> > 在 2020年5月9日,13:51,guaishushu1...@163.com  
>> > 写道:
>> >
>> > hi 大家,我在使用TopN语法,往mysql写数据时,发现必须将rownum设置为数据库的主键,要不然会报错,有人遇到吗
>> >
>> > guaishushu1...@163.com  
>> > >
> 



Re: Flink convert Table to DataSet[Row]

2020-05-20 Thread Jingsong Li
可能也有问题,flink planner可能已经不支持hive connector了。

On Wed, May 20, 2020 at 2:57 PM 张锴  wrote:

> 我用的是flink1.10的,那意思只能用flink planner的方式了吗
>
> Jingsong Li  于2020年5月20日周三 下午2:55写道:
>
> > blink planner是不支持和Dataset的转换的。
> >
> > Best,
> > Jingsong Lee
> >
> > On Wed, May 20, 2020 at 2:49 PM 张锴  wrote:
> >
> > >   def main(args: Array[String]): Unit = {
> > > val tableEnvSettings = EnvironmentSettings.newInstance()
> > >   .useBlinkPlanner()
> > >   .inBatchMode()
> > >   .build()
> > >
> > > val tableEnv: TableEnvironment =
> > > TableEnvironment.create(tableEnvSettings)
> > >
> > > val catalog = new HiveCatalog(
> > >   "myhive", // catalog name
> > >   "mydatabase", // default database
> > >   "D:\\data\\conf", // Hive config (hive-site.xml) directory
> > >   "3.1.2" // Hive version
> > > )
> > >
> > > tableEnv.registerCatalog("myhive", catalog)
> > > tableEnv.useCatalog("myhive")
> > > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> > > tableEnv.listTables().foreach(println)
> > >
> > > import org.apache.flink.table.api.scala._
> > > import org.apache.flink.api.scala._
> > >
> > >
> > > val mytable = tableEnv.from("mytable")
> > > val result = mytable
> > >   .groupBy("pfid")
> > >   .select("nv_mv", "pfid")
> > >   .toDataSet[Row] // conversion to DataSet
> > >   .print()
> > >
> > >   }
> > >
> > >   Exception in thread "main"
> > > org.apache.flink.table.api.ValidationException: Only tables that
> > originate
> > > from Scala DataSets can be converted to Scala DataSets.
> > > at
> > >
> > >
> >
> org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:56)
> > > at HiveService$.main(HiveService.scala:40)
> > > at HiveService.main(HiveService.scala)
> > >
> > > Jingsong Li  于2020年5月20日周三 下午2:06写道:
> > >
> > > > 不好意思,
> > > >
> > > > 还是看不到你的图,可以考虑copy异常栈。
> > > >
> > > > 方便问一下后续的指标计算用Table/SQL搞不定吗?
> > > >
> > > > Best,
> > > > Jingsong Lee
> > > >
> > > > On Wed, May 20, 2020 at 1:52 PM 张锴  wrote:
> > > >
> > > > > [image: 微信图片_20200520132244.png]
> > > > > [image: 微信图片_20200520132343.png]
> > > > >
> > > > > Jingsong Li  于2020年5月20日周三 下午1:30写道:
> > > > >
> > > > >> Hi,
> > > > >>
> > > > >> 没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?
> > > > >>
> > > > >> Best,
> > > > >> Jingsong Lee
> > > > >>
> > > > >> On Wed, May 20, 2020 at 1:26 PM 张锴  wrote:
> > > > >>
> > > > >> > 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常
> > 在附件中,麻烦各位小伙伴给看一下。
> > > > >> >
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Best, Jingsong Lee
> > > > >>
> > > > >
> > > >
> > > > --
> > > > Best, Jingsong Lee
> > > >
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


-- 
Best, Jingsong Lee


Re: CoFlatMap has high back pressure

2020-05-20 Thread Arvid Heise
Hi Sundar,

in general, you wouldn't load the static data in the driver, but upon
opening the map on the processing nodes. If your processing nodes could
hold the data, it might be the easiest to switch to this pattern. You
usually load it once per node across all subtasks by using some kind of
static map. I can go into details if you are interested.

Now for the actual question.

1. As soon as you use Co* functions, your pipeline cannot be chained
anymore and needs to go over the network. I suspect that this is where the
lag comes from. You can try to tweak the network configurations if you use
a high degree of parallelism [1].
2. Not that I know of.
3. See above.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#configuring-the-network-buffers

On Wed, May 20, 2020 at 4:55 AM Guowei Ma  wrote:

> Hi Sundar,
> 1. Could you check the GC status of the process? or you could increase the
> memory size of your TM. (I find that you use the value state and I assume
> that you use the MemoryStatebackend)
> 2. AFAIK there is no performance limitation in using the `connect`
> operator for mixing the bounded/unbounded stream.
>
> Best,
> Guowei
>
>
> sundar  于2020年5月20日周三 上午9:54写道:
>
>> Hi Guaowei,
>> Here is what the code for my pipeline looks like.
>>
>> Class CoFlatMapFunc extends
>> CoFlatMapFunction
>> {
>>ValueState cache;
>>
>>public void open(Configuration parameters){
>>  //initialize cache
>>}
>>
>>//read element from file and update cache.
>>public void flatMap1(FileInput fileInput, Collector
>> collector){
>> cache.update(fileInput);
>>}
>>//read element from kafka, look up cache and output tuple.
>>public void flatMap2(KafkaInput kafkaInput, Collector
>> collector){
>>   return new KafkaOutput(kafkaInput,cache.value());
>>}
>> }
>>
>>
>> // Old pipeline that works fine.
>> Class OldFlinkPipeline {
>>
>>public SingleOutputStreamOperator
>>
>> generateOutput(StreamExecutionEnvironment env){
>>
>>  DataStream kafkaStream = env
>>.addSource(new KafkaSourceFunction());
>>
>> return kafkaStream
>> .map(kafkaInput -
>>   new KafkaOutput(kafkaInput, null /*fileInput*/ );
>>
>>}
>>
>>
>> }
>>
>> //New pipeline that is consuming more than 4X the resources.
>> Class NewFlinkPipeline {
>>
>>public SingleOutputStreamOperator
>>
>> generateOutput(StreamExecutionEnvironment env){
>>
>>  KeyedStream kafkaStream = env
>>.addSource(new KafkaSourceFunction())
>>.keyBy(kafkaInput -
>> kafkaInput.getId());
>>
>>  KeyedStream fileStream = env
>> .readTextFile(file.txt)
>> .keyBy(fileInput - fileInput.getId());
>>
>> return fileStream
>>.connect(kafkaStream)
>>.coFlatMap(new CoFlatMapFunc())
>>
>>}
>>
>> }
>>
>> Please do let me know if this is the recommended way to connect a bounded
>> stream with an unbounded stream, or if I am doing something obviously
>> expensive here.
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Flink convert Table to DataSet[Row]

2020-05-20 Thread 张锴
我用的是flink1.10的,那意思只能用flink planner的方式了吗

Jingsong Li  于2020年5月20日周三 下午2:55写道:

> blink planner是不支持和Dataset的转换的。
>
> Best,
> Jingsong Lee
>
> On Wed, May 20, 2020 at 2:49 PM 张锴  wrote:
>
> >   def main(args: Array[String]): Unit = {
> > val tableEnvSettings = EnvironmentSettings.newInstance()
> >   .useBlinkPlanner()
> >   .inBatchMode()
> >   .build()
> >
> > val tableEnv: TableEnvironment =
> > TableEnvironment.create(tableEnvSettings)
> >
> > val catalog = new HiveCatalog(
> >   "myhive", // catalog name
> >   "mydatabase", // default database
> >   "D:\\data\\conf", // Hive config (hive-site.xml) directory
> >   "3.1.2" // Hive version
> > )
> >
> > tableEnv.registerCatalog("myhive", catalog)
> > tableEnv.useCatalog("myhive")
> > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> > tableEnv.listTables().foreach(println)
> >
> > import org.apache.flink.table.api.scala._
> > import org.apache.flink.api.scala._
> >
> >
> > val mytable = tableEnv.from("mytable")
> > val result = mytable
> >   .groupBy("pfid")
> >   .select("nv_mv", "pfid")
> >   .toDataSet[Row] // conversion to DataSet
> >   .print()
> >
> >   }
> >
> >   Exception in thread "main"
> > org.apache.flink.table.api.ValidationException: Only tables that
> originate
> > from Scala DataSets can be converted to Scala DataSets.
> > at
> >
> >
> org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:56)
> > at HiveService$.main(HiveService.scala:40)
> > at HiveService.main(HiveService.scala)
> >
> > Jingsong Li  于2020年5月20日周三 下午2:06写道:
> >
> > > 不好意思,
> > >
> > > 还是看不到你的图,可以考虑copy异常栈。
> > >
> > > 方便问一下后续的指标计算用Table/SQL搞不定吗?
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Wed, May 20, 2020 at 1:52 PM 张锴  wrote:
> > >
> > > > [image: 微信图片_20200520132244.png]
> > > > [image: 微信图片_20200520132343.png]
> > > >
> > > > Jingsong Li  于2020年5月20日周三 下午1:30写道:
> > > >
> > > >> Hi,
> > > >>
> > > >> 没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?
> > > >>
> > > >> Best,
> > > >> Jingsong Lee
> > > >>
> > > >> On Wed, May 20, 2020 at 1:26 PM 张锴  wrote:
> > > >>
> > > >> > 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常
> 在附件中,麻烦各位小伙伴给看一下。
> > > >> >
> > > >>
> > > >>
> > > >> --
> > > >> Best, Jingsong Lee
> > > >>
> > > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


Re: Flink convert Table to DataSet[Row]

2020-05-20 Thread Jingsong Li
blink planner是不支持和Dataset的转换的。

Best,
Jingsong Lee

On Wed, May 20, 2020 at 2:49 PM 张锴  wrote:

>   def main(args: Array[String]): Unit = {
> val tableEnvSettings = EnvironmentSettings.newInstance()
>   .useBlinkPlanner()
>   .inBatchMode()
>   .build()
>
> val tableEnv: TableEnvironment =
> TableEnvironment.create(tableEnvSettings)
>
> val catalog = new HiveCatalog(
>   "myhive", // catalog name
>   "mydatabase", // default database
>   "D:\\data\\conf", // Hive config (hive-site.xml) directory
>   "3.1.2" // Hive version
> )
>
> tableEnv.registerCatalog("myhive", catalog)
> tableEnv.useCatalog("myhive")
> tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> tableEnv.listTables().foreach(println)
>
> import org.apache.flink.table.api.scala._
> import org.apache.flink.api.scala._
>
>
> val mytable = tableEnv.from("mytable")
> val result = mytable
>   .groupBy("pfid")
>   .select("nv_mv", "pfid")
>   .toDataSet[Row] // conversion to DataSet
>   .print()
>
>   }
>
>   Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Only tables that originate
> from Scala DataSets can be converted to Scala DataSets.
> at
>
> org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:56)
> at HiveService$.main(HiveService.scala:40)
> at HiveService.main(HiveService.scala)
>
> Jingsong Li  于2020年5月20日周三 下午2:06写道:
>
> > 不好意思,
> >
> > 还是看不到你的图,可以考虑copy异常栈。
> >
> > 方便问一下后续的指标计算用Table/SQL搞不定吗?
> >
> > Best,
> > Jingsong Lee
> >
> > On Wed, May 20, 2020 at 1:52 PM 张锴  wrote:
> >
> > > [image: 微信图片_20200520132244.png]
> > > [image: 微信图片_20200520132343.png]
> > >
> > > Jingsong Li  于2020年5月20日周三 下午1:30写道:
> > >
> > >> Hi,
> > >>
> > >> 没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?
> > >>
> > >> Best,
> > >> Jingsong Lee
> > >>
> > >> On Wed, May 20, 2020 at 1:26 PM 张锴  wrote:
> > >>
> > >> > 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常 在附件中,麻烦各位小伙伴给看一下。
> > >> >
> > >>
> > >>
> > >> --
> > >> Best, Jingsong Lee
> > >>
> > >
> >
> > --
> > Best, Jingsong Lee
> >
>


-- 
Best, Jingsong Lee


回复: 关于FlinkSQL slot数量过多的问题

2020-05-20 Thread 111
Hi,
感谢回复,我研究下引用的资料,期待1.11
Best,
Xinghalo

Re: Flink convert Table to DataSet[Row]

2020-05-20 Thread 张锴
  def main(args: Array[String]): Unit = {
val tableEnvSettings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inBatchMode()
  .build()

val tableEnv: TableEnvironment =
TableEnvironment.create(tableEnvSettings)

val catalog = new HiveCatalog(
  "myhive", // catalog name
  "mydatabase", // default database
  "D:\\data\\conf", // Hive config (hive-site.xml) directory
  "3.1.2" // Hive version
)

tableEnv.registerCatalog("myhive", catalog)
tableEnv.useCatalog("myhive")
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.listTables().foreach(println)

import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._


val mytable = tableEnv.from("mytable")
val result = mytable
  .groupBy("pfid")
  .select("nv_mv", "pfid")
  .toDataSet[Row] // conversion to DataSet
  .print()

  }

  Exception in thread "main"
org.apache.flink.table.api.ValidationException: Only tables that originate
from Scala DataSets can be converted to Scala DataSets.
at
org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:56)
at HiveService$.main(HiveService.scala:40)
at HiveService.main(HiveService.scala)

Jingsong Li  于2020年5月20日周三 下午2:06写道:

> 不好意思,
>
> 还是看不到你的图,可以考虑copy异常栈。
>
> 方便问一下后续的指标计算用Table/SQL搞不定吗?
>
> Best,
> Jingsong Lee
>
> On Wed, May 20, 2020 at 1:52 PM 张锴  wrote:
>
> > [image: 微信图片_20200520132244.png]
> > [image: 微信图片_20200520132343.png]
> >
> > Jingsong Li  于2020年5月20日周三 下午1:30写道:
> >
> >> Hi,
> >>
> >> 没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?
> >>
> >> Best,
> >> Jingsong Lee
> >>
> >> On Wed, May 20, 2020 at 1:26 PM 张锴  wrote:
> >>
> >> > 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常 在附件中,麻烦各位小伙伴给看一下。
> >> >
> >>
> >>
> >> --
> >> Best, Jingsong Lee
> >>
> >
>
> --
> Best, Jingsong Lee
>


Re: Watermarks and parallelism

2020-05-20 Thread Arvid Heise
Hi Gnanasoundari,

there are two things that you need to choose:
* maxOutOfOrderness [1], which determines how long you wait until you close
the windows for the first time
* and allowedLateness [2], which allows late events to be processed and
cause update events

In general, you have to pick some assumptions. Even without Flink, you need
to find a point in time where you kick off your calculation for the first
time. Then you can choose to update the results if more data comes in.
However, you still need to find a maximum wait time to handle disconnected
assets and to eventually free the resources.

For your questions:
1) If assets may be down, you probably want to introduce an idle timeout in
your watermark assigner, like this
https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
.
2) That's where allowedLateness comes in. You can configure Flink to ignore
late events (default) or to perform recalculation and emit update events.
Note that your downstream pipelines need to be able to handle such update
events.

I also recommend checking out retract streams of the table API [3].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#with-periodic-watermarks
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/windows.html#allowed-lateness
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion

On Wed, May 20, 2020 at 6:25 AM Gnanasoundari Soundarajan <
gnanasoundari.soundara...@man-es.com> wrote:

> Thanks Arvid for the explanation.
>
>
>
> Assume, we have 5 assets.
>
> Asset 1, 2 sending current timestamp May 17th 00:00
>
> Asset 3  went down on May 15th 00:00 and it restarted on May 17th 00:00
> and it started sending data from May 15th 00:00
>
> Asset 4 went down on May 16th 00:00 and it is still down
>
> Asset 5 went down on May 14th 00:00 and it restarted on May 17th 01:00
> and it started sending data from May 14th 00:00
>
>
>
> In the above scenario,
>
>- Due to the uncertainty on the asset connectivity, I can’t predict
>out of orderness duration.
>- Not all the asset will communicate all the time.
>
>
>
> I have a kafka topic as source and sink to flink job and I have event time
> window of 1min.
>
>
>
> *Query:*
>
>1. As I have 5 assets, if I set parallelism of 5 in window operator
>level, will it not have any issues in watermark progression when asset 4 is
>not communicating? Assume, I use key By asset id(1 to 5)
>2. Assume window operator chose the watermark as May 15th 00:00 of
>asset 3 as it is a minimum event time across the sub task of window, if
>asset 5 sends the data for May14th 00:00, will asset 5 data not be dropped
>considering it as a late date?
>
>
>
> Regards,
>
> Gnana
>
>
>
> *From: *Arvid Heise 
> *Date: *Monday, 18 May 2020 at 4:59 PM
> *To: *Gnanasoundari Soundarajan 
> *Cc: *Alexander Fedulov , "user@flink.apache.org"
> 
> *Subject: *Re: Watermarks and parallelism
>
>
>
> Hi Gnanasoundari,
>
>
>
> Your use case is very typical and pretty much the main motivation for
> event time and watermarks. It's supported out of the box. I recommend
> reading again the first resource of Alex.
>
>
>
> To make it clear, let's have a small example:
>
>
>
> Source 1 -\
>
>  +--> Window --> Sink
>
> Source 2 -/
>
>
>
> Consider source 1 being 1s ahead of source 2. Then the watermarks are also
> one 1s ahead. Now at the window level, the watermark will only advance at
> the minimum! That's why no data is lost in your case.
>
>
>
> In particular, consider the following events that pop up once per second
> that just need to be summed up per minute.
>
> Source1: (00:01, s1_event1), ..., (00:59, s1_event59), (01:00,
> s1_event60), (01:01, s1_event61), ...
>
> Source2: (00:00, s2_event1), ..., (00:58, s2_event59), (00:59,
> s2_event60), (01:00, s2_event61), ...
>
> For simplicity, assume that watermark = event timestamp.
>
>
>
> Then consider a window [00:00, 00:59], this window will only close off,
> perform the aggregation, and fire the result, if the watermark from both
> sources reached 01:00 (so when the event with that timestamp occurs).
>
> It will contain 59 events from Source1 and 60 events from Source2.
>
> In particular, when event s1_event60 arrives at 01:00, it carries over to
> the next window [01:00, 01:59], while the previous window is still open for
> events from Source2. Only after receiving s2_event61 at 01:00, the first
> window will result in an output event.
>
>
>
> Of course that also means that data from quick sources need to live as
> long in the main memory (or actually state backend) as it takes for the
> slowest source to catch up.
>
>
>
> On Fri, May 15, 2020 at 7:16 PM Gnanasoundari Soundarajan <
> gnanasoundari.soundara...@man-es.com> wrote:
>
> 

Re: Blink Planner构造Remote Env

2020-05-20 Thread Jark Wu
Hi,

因为 Blink planner
不支持 org.apache.flink.table.api.java.BatchTableEnvironment,所以无法对接
ExecutionEnvironment。
Blink planner 的 batch 模式,目前只支持 TableEnvironemnt,不过也可以通过 hack 的方式去使用
StreamTableEnvironment,
需要直接去构造 StreamTableEnvironmentImpl:

StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.createRemoteEnvironment(...);
StreamTableEnvironmentImpl tEnv = new StreamTableEnvironmentImpl(..
execEnv, .., false); // 构造的参数可以参考 StreamTableEnvironmentImpl#create 的实现

Best,
Jark

On Tue, 19 May 2020 at 15:27, jun su  wrote:

> hi all,
>
> 过去在ide中想连接远程flink集群可以 ExecutionEnvironment.createRemoteEnvironment()
>
> 官网Blink构建方式是:
>
> val bbSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
> val bbTableEnv = TableEnvironment.create(bbSettings)
>
>
> 请问如何连接远程集群呢?
>
> --
> Best,
> Jun Su
>


Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'

2020-05-20 Thread Leonard Xu
Hi,

如劲松所说,这是 hiveCatalog的一个bug,正常是支持的,1.11中会修复

Best,
Leonard


> 在 2020年5月20日,13:57,wind.fly@outlook.com 写道:
> 
> Hi,
>版本用的是1.10.0,x.log.yanfa_log是正常的表格式,本人demo中用的是hive catalog:
>   Catalog myCatalog = new HiveCatalog("x", "default",
> 
>"D:\\conf", "1.1.0");
> 
>  tEnv.registerCatalog("x", myCatalog);
> 
> Best,
> Junbao Zhang
> 
> 
> 发件人: Leonard Xu 
> 发送时间: 2020年5月20日 11:51
> 收件人: user-zh 
> 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR 
> SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
> 
> Hi,wind
> 
> 用你的sql没有报类似的问题,请确认下版本是1.10.x吗?
> 另外不建议表名用 x.log.yanfa_log 包含 “.” 
> 这个关键字符,这和表的全名:catalogName.databaseName.tableName 会冲突,应该在建表时会报catalog x 
> 不存在的问题,没复现proctime field不支持的问题。
> 
> Best,
> Leonard
> 
>> 在 2020年5月20日,11:01,wind.fly@outlook.com 写道:
>> 
>> Hi,
>>   建表语句为:
>>   CREATE TABLE x.log.yanfa_log (
>>   dt TIMESTAMP(3),
>>   conn_id STRING,
>>   sequence STRING,
>>   trace_id STRING,
>>   span_info STRING,
>>   service_id STRING,
>>   msg_id STRING,
>>   servicename STRING,
>>   ret_code STRING,
>>   duration STRING,
>>   req_body MAP,
>>   res_body MAP,
>>   extra_info MAP,
>>   proctime AS PROCTIME(),
>>   WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
>> ) WITH (
>>   'connector.type' = 'kafka',
>>   'connector.version' = '0.11',
>>   'connector.topic' = 'x-log-yanfa_log',
>>   'connector.properties.bootstrap.servers' = '*:9092',
>>   'connector.properties.zookeeper.connect' = '*:2181',
>>   'connector.startup-mode' = 'latest-offset',
>>   'update-mode' = 'append',
>>   'format.type' = 'json',
>>   'format.fail-on-missing-field' = 'true'
>> );
>> 
>> join sql 可以查看历史记录里面有。
>> 
>> Best,
>> Junbao Zhang
>> 
>> 发件人: Leonard Xu 
>> 发送时间: 2020年5月20日 10:50
>> 收件人: user-zh 
>> 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR 
>> SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
>> 
>> Hi,
>> 
>> 1.10(1.10.1)版本都是支持建表时用计算列声明proctime列的,temporal table也是支持join 和 left 
>> join的,我这边之前1.10 release时验证过[1],可以参考
>> 方便把你们完整的sql 贴出来吗?
>> 
>> 
>> Best,
>> Leonard Xu
>> [1]
>> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql
>>  
>> 
>> 
>> 
>>> 在 2020年5月19日,09:23,wind.fly@outlook.com 写道:
>>> 
>>> 我就是按照这里面的语法去做的,试验没有成功,能把你试验的create table和query sql语句贴出来吗?谢谢。
>>> 
>>> 发件人: 祝尚 <17626017...@163.com>
>>> 发送时间: 2020年5月19日 0:02
>>> 收件人: user-zh@flink.apache.org 
>>> 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR 
>>> SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
>>> 
>>> 可以的吧,jark大佬的例子http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/
>>>  
>>> 
>>> 也是这么用的,我也试过sql client和table api里面都没问题
>>> 
 2020年5月18日 下午4:43,wind.fly@outlook.com 写道:
 
 Hi,
 经过尝试,select时候调用PROCTIME()函数生成proctime字段是可行的,谢谢。
 
 发件人: 111 
 发送时间: 2020年5月18日 16:07
 收件人: user-zh@flink.apache.org 
 主题: 回复: flink sql使用维表关联时报Temporal table join currently only supports 'FOR 
 SYSTEM_TIME AS OF' left table's proctime field, doesn't support 
 'PROCTIME()'
 
 Hi,
 
 
 试验了下,proctime不能在建表时创建。需要在select的时候基于PROCTIME()函数生成。
 
 
 比如:
 Select …., PROCTIME() AS proctime from xxx;
 Select * from xxx t1 left join yyy for system_time as of t1.proctime as t2 
 on t1.id = t2.id;
 这样才行。
 
 
 Best,
 Xinghalo
>>> 
>> 
> 



Re: Protection against huge values in RocksDB List State

2020-05-20 Thread Congxian Qiu
Great to hear that.

Best,
Congxian


Robin Cassan  于2020年5月20日周三 上午12:18写道:

> Hi Yun and Congxian!
> I have implemented a pre-filter that uses an keyed state (
> AggregatingState[Long]) that computes the size of all records seen for
> each key, which lets me filter-out records that should be too big for the
> RocksDB JNI bridge. This seems to make our job behave better! Thanks for
> your help guys, this was really helpful :)
>
> Robin
>
> Le sam. 16 mai 2020 à 09:05, Congxian Qiu  a
> écrit :
>
>> Hi
>>
>> As you described, I'm not sure whether MapState can help you in such
>> case. MapState will serializer each  separately, so it
>> would not encounter such the problem as ListState.
>>
>> When using MapState, you may need to handle how to set the mapKey, if the
>> whole state will be cleared after processed, then you can use a monotonous
>> increment integer as the mapKey, store the upper used mapKey in a value
>> state.
>>
>>
>> Best,
>> Congxian
>>
>>
>> Yun Tang  于2020年5月15日周五 下午10:31写道:
>>
>>> Hi Robin
>>>
>>> I think you could record the size of you list under currentKey with
>>> another value state or operator state (store a Map with >> length>, store the whole map in list when snapshotting). If you do not have
>>> many key-by keys, operator state is a good choice as that is on-heap and
>>> lightweight.
>>>
>>> Best
>>> Yun Tang
>>> --
>>> *From:* Robin Cassan 
>>> *Sent:* Friday, May 15, 2020 20:59
>>> *To:* Yun Tang 
>>> *Cc:* user 
>>> *Subject:* Re: Protection against huge values in RocksDB List State
>>>
>>> Hi Yun, thanks for your answer! And sorry I didn't see this limitation
>>> from the documentation, makes sense!
>>> In our case, we are merging too many elements (since each element is
>>> limited to 4Mib in our kafka topic). I agree we do not want our state to
>>> contain really big values, this is why we are trying to find a way to put a
>>> limit on the number (or total size) of elements that are aggregated in the
>>> state of the window.
>>> We have found a way to do this by using another sessionWindow that is
>>> set before the other one, which will store the number of messages for each
>>> key and reject new messages if we have reached a limit, but we are
>>> wondering if there is a better way to achieve that without creating another
>>> state.
>>>
>>> Thanks again,
>>> Robin
>>>
>>> Le jeu. 14 mai 2020 à 19:38, Yun Tang  a écrit :
>>>
>>> Hi Robin
>>>
>>> First of all, the root cause is not RocksDB cannot store large list
>>> state when you merge but the JNI limitation of 2^31 bytes [1].
>>> Moreover, RocksDB java would not return anything when you call merge [2]
>>> operator.
>>>
>>> Did you merge too many elements or just merge too big-size elements?
>>> Last but not least, even you could merge large list, I think getting a
>>> value with size larger than 2^31 bytes should not behave well.
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
>>> [2]
>>> https://github.com/facebook/rocksdb/blob/50d63a2af01a46dd938dc1b717067339c92da040/java/src/main/java/org/rocksdb/RocksDB.java#L1382
>>>
>>> Best
>>> Yun Tang
>>> --
>>> *From:* Robin Cassan 
>>> *Sent:* Friday, May 15, 2020 0:37
>>> *To:* user 
>>> *Subject:* Protection against huge values in RocksDB List State
>>>
>>> Hi all!
>>>
>>> I cannot seem to find any setting to limit the number of records
>>> appended in a RocksDBListState that is used when we use SessionWindows with
>>> a ProcessFunction.
>>> It seems that, for each incoming element, the new element will be
>>> appended to the value with the RocksDB `merge` operator, without any
>>> safeguard to make sure that it doesn't grow infinitely. RocksDB merge seems
>>> to support returning false in case of error, so I guess we could implement
>>> a limit by returning false in the merge operator, but since Flink seems to
>>> use the "stringappendtest" merge operator (
>>> https://github.com/facebook/rocksdb/blob/fdf882ded218344c136c97daf76dfb59e4bc155f/utilities/merge_operators/string_append/stringappend2.cc
>>>  ),
>>> we always return true no matter what.
>>>
>>> This is troublesome for us because it would make a lot of sense to
>>> specify an acceptable limit to how many elements can be aggregated under a
>>> given key, and because when we happen to have too many elements we get an
>>> exception from RocksDB:
>>> ```
>>> Caused by: org.apache.flink.util.FlinkRuntimeException: Error while
>>> retrieving data from RocksDB
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111)
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501)
>>> 

关于FlinkSQL slot数量过多的问题

2020-05-20 Thread 111
Hi,


各位好,最近在使用Flink SQL实现离线处理时,遇到资源占用过多的问题:
1 由于之前一个taskmanager配置了多个slot,导致slot之间内存抢占溢出的问题,后来每个taskmanager就配置了一个slot
2 有的sql非常复杂,需要读取多个hive source,我们配置了开启hive推断并配置最大的并行度为10;
3 当多个操作节点并行时,一个普通的任务可能需要申请上百个slot
直接导致yarn集群资源被耗光。


想了解下,针对slot是否有相关配置,限制最大slot的申请数量,使得不同任务可以共享slot执行。


另外,关于slotsharingGroup有点疑问,slotsharinggroup是不是仅针对单个source与下游的function。比如:
Source[4] —> map[4] —> reduce[3],这三个任务可以进行slotsharing,此时需要的最大slot是4。
但是如果source包含2个,此时两个source是无法进行slotsharing的,此时需要的最大slot是8。


Best,
xinghalo

Re: Flink convert Table to DataSet[Row]

2020-05-20 Thread Jingsong Li
不好意思,

还是看不到你的图,可以考虑copy异常栈。

方便问一下后续的指标计算用Table/SQL搞不定吗?

Best,
Jingsong Lee

On Wed, May 20, 2020 at 1:52 PM 张锴  wrote:

> [image: 微信图片_20200520132244.png]
> [image: 微信图片_20200520132343.png]
>
> Jingsong Li  于2020年5月20日周三 下午1:30写道:
>
>> Hi,
>>
>> 没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢?
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, May 20, 2020 at 1:26 PM 张锴  wrote:
>>
>> > 我在测试将hive查询出的数据转换成DataSet[Row]时,出现一些无法解决的问题,代码和异常 在附件中,麻烦各位小伙伴给看一下。
>> >
>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 
Best, Jingsong Lee


Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'

2020-05-20 Thread Jingsong Li
Hi Junbao, Xinghalo,

抱歉,现在HiveCatalog保存proctime字段是有bug的,[1]。所以就像你说的,proctime不能在建表时创建。需要在select的时候基于PROCTIME()函数生成,这样来绕过。

正在修复中,你也可以打上patch来试试,或者等下1.11.0或1.10.2的发布。

[1]https://issues.apache.org/jira/browse/FLINK-17189

Best,
Jingsong Lee

On Wed, May 20, 2020 at 1:58 PM wind.fly@outlook.com <
wind.fly@outlook.com> wrote:

> Hi,
> 版本用的是1.10.0,x.log.yanfa_log是正常的表格式,本人demo中用的是hive catalog:
>Catalog myCatalog = new HiveCatalog("x", "default",
>
> "D:\\conf", "1.1.0");
>
>   tEnv.registerCatalog("x", myCatalog);
>
> Best,
> Junbao Zhang
>
> 
> 发件人: Leonard Xu 
> 发送时间: 2020年5月20日 11:51
> 收件人: user-zh 
> 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR
> SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
>
> Hi,wind
>
> 用你的sql没有报类似的问题,请确认下版本是1.10.x吗?
> 另外不建议表名用 x.log.yanfa_log 包含 “.”
> 这个关键字符,这和表的全名:catalogName.databaseName.tableName 会冲突,应该在建表时会报catalog x
> 不存在的问题,没复现proctime field不支持的问题。
>
> Best,
> Leonard
>
> > 在 2020年5月20日,11:01,wind.fly@outlook.com 写道:
> >
> > Hi,
> >建表语句为:
> >CREATE TABLE x.log.yanfa_log (
> >dt TIMESTAMP(3),
> >conn_id STRING,
> >sequence STRING,
> >trace_id STRING,
> >span_info STRING,
> >service_id STRING,
> >msg_id STRING,
> >servicename STRING,
> >ret_code STRING,
> >duration STRING,
> >req_body MAP,
> >res_body MAP,
> >extra_info MAP,
> >proctime AS PROCTIME(),
> >WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
> > ) WITH (
> >'connector.type' = 'kafka',
> >'connector.version' = '0.11',
> >'connector.topic' = 'x-log-yanfa_log',
> >'connector.properties.bootstrap.servers' = '*:9092',
> >'connector.properties.zookeeper.connect' = '*:2181',
> >'connector.startup-mode' = 'latest-offset',
> >'update-mode' = 'append',
> >'format.type' = 'json',
> >'format.fail-on-missing-field' = 'true'
> > );
> >
> > join sql 可以查看历史记录里面有。
> >
> > Best,
> > Junbao Zhang
> > 
> > 发件人: Leonard Xu 
> > 发送时间: 2020年5月20日 10:50
> > 收件人: user-zh 
> > 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports
> 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support
> 'PROCTIME()'
> >
> > Hi,
> >
> > 1.10(1.10.1)版本都是支持建表时用计算列声明proctime列的,temporal table也是支持join 和 left
> join的,我这边之前1.10 release时验证过[1],可以参考
> > 方便把你们完整的sql 贴出来吗?
> >
> >
> > Best,
> > Leonard Xu
> > [1]
> >
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql
> <
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql
> >
> >
> >
> >> 在 2020年5月19日,09:23,wind.fly@outlook.com 写道:
> >>
> >> 我就是按照这里面的语法去做的,试验没有成功,能把你试验的create table和query sql语句贴出来吗?谢谢。
> >> 
> >> 发件人: 祝尚 <17626017...@163.com>
> >> 发送时间: 2020年5月19日 0:02
> >> 收件人: user-zh@flink.apache.org 
> >> 主题: Re: flink sql使用维表关联时报Temporal table join currently only supports
> 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support
> 'PROCTIME()'
> >>
> >> 可以的吧,jark大佬的例子
> http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/
> <
> http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/
> >
> >> 也是这么用的,我也试过sql client和table api里面都没问题
> >>
> >>> 2020年5月18日 下午4:43,wind.fly@outlook.com 写道:
> >>>
> >>> Hi,
> >>> 经过尝试,select时候调用PROCTIME()函数生成proctime字段是可行的,谢谢。
> >>> 
> >>> 发件人: 111 
> >>> 发送时间: 2020年5月18日 16:07
> >>> 收件人: user-zh@flink.apache.org 
> >>> 主题: 回复: flink sql使用维表关联时报Temporal table join currently only supports
> 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support
> 'PROCTIME()'
> >>>
> >>> Hi,
> >>>
> >>>
> >>> 试验了下,proctime不能在建表时创建。需要在select的时候基于PROCTIME()函数生成。
> >>>
> >>>
> >>> 比如:
> >>> Select …., PROCTIME() AS proctime from xxx;
> >>> Select * from xxx t1 left join yyy for system_time as of t1.proctime
> as t2 on t1.id = t2.id;
> >>> 这样才行。
> >>>
> >>>
> >>> Best,
> >>> Xinghalo
> >>
> >
>
>

-- 
Best, Jingsong Lee