Re: 关于非keyedstream使用定时器问题

2021-02-17 Thread yidan zhao
不行,那不就导致不均衡了。数据得均衡。
1均衡 2 batch 3 timeout
目前这3者依靠flink现有机制比较难实现,当然并不是所以场景都需要这样,比如mysql还需要考虑死锁问题,但对于部分不需要考虑锁的sink,其实不在意相同key是否分发到一起,所以不需要依靠keyedStream,这样能保证1,2,但无法保证3。
使用keyedStream使用随机key(很随机),会保证1,3,但无法有2(因为key太随机,每个key下数据太少)。
使用keyedStream使用随机key %
N,则会保证3,1和2则是看具体数据分布、数据量,以及N的大小都可能有关系。部分场景下很难有合适的参数保证全部3者。

Kenyore Woo  于2021年2月15日周一 下午1:09写道:

> 如果使用一个常量字段作为key呢。是不是就可以实现你要的效果了
>
> yidan zhao  于2021年2月9日周二 下午3:24写道:
>
> > 当然,如果是 randomeKey %30 这样,虽然最终效果差不多,但却导致30个sink batch可能都集中到某几个并发实例上。
> >
> > yidan zhao  于2021年2月9日周二 下午3:22写道:
> >
> > > 引入随机key最大问题是,本身希望batch方式sink,key太随机压根无法batch。
> > >
> >
> 如果randomKey%1024这样分桶到1024的话,也不行,会导致每个bucket中数据量太少,进而基本都是timeout而sink,而不是达到batchSize,换言之,每次sink都会有1024个并发sink次数。
> > > 实际后端的存储可能不期望这么高并发,本身做batch的目的就是希望降低sink次数。
> > >
> > >
> >
> 我希望的是按照并发度(比如30并发度)就每次sink30次(或超过30次,因为可能>batchSize,如果都 > >
> > > yidan zhao  于2021年2月9日周二 下午3:04写道:
> > >
> > >> 如题,当前flink不支持非keyedStream使用定时器,不清楚有啥解决方法吗?
> > >>
> > >> 目前我实现一个sink,带超时希望用到timerservice。但是不支持。
> > >> 同时不希望使用keyedStream,因为会导致数据不均衡。
> > >>
> > >> 除了引入随机key外还有什么方法吗。
> > >>
> > >
> >
>


Re: Sharding of Operators

2021-02-17 Thread yidan zhao
Actually, we only need to ensure all records belonging to the same key will
be forwarded to the same operator instance(i), and we do not need to
guarantee that 'i' is the same with the 'i' in previous savepoints. When
the job is restarted, the rule 'same key's record will be in together' is
guaranteed and more slots will be surely useful, since each slot(operator
instance) will be responsible for less keys, leading to less records.

Tripathi,Vikash  于2021年2月18日周四 上午12:09写道:

> Hi there,
>
>
>
> I wanted to know how re-partitioning of keys per operator instance would
> happen when the current operator instances are scaled up or down and we are
> restarting our job from a previous savepoint which had a different number
> of parallel instances of the same operator.
>
>
>
> My main concern is whether the re-distribution would lead to mapping of
> same keys to same operator instances as was done earlier but if this
> happens then there would be no added advantage of adding new task slots for
> the same operator because they would remain less used or not used at all if
> all possible key values have been seen earlier and if we go by the other
> way around of evenly distributing out keys (based on the hash function) to
> the new parallel slots as well, won't this cause issues in terms of
> processing consistent results based on the state of operator as was
> provided by previous savepoint of application.
>
>
>
> Is there a guarantee given by the hash function as in attached snippet,
> that same keys which landed earlier on an operator instance will land back
> again to the same operator instance once the job is restarted with new set
> of parallelism configuration?
>
>
>
> Thanks,
>
> Vikash
>
>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are from
> Cerner Corporation and are intended only for the addressee. The information
> contained in this message is confidential and may constitute inside or
> non-public information under international, federal, or state securities
> laws. Unauthorized forwarding, printing, copying, distribution, or use of
> such information is strictly prohibited and may be unlawful. If you are not
> the addressee, please promptly delete this message and notify the sender of
> the delivery error by e-mail or you may call Cerner's corporate offices in
> Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>


?????? fink on yarn per job container ????

2021-02-17 Thread zhiyezou
Hi


1. 
rocksdb
2. 
state.backend.rocksdb.localdir??noneflinkNodeManager??LOCAL_DIRSflinkio.tmp.dirs
 
${LOCAL_DIRS}/usercache/xx/appcache/application_xx/flink-io-860fdbc2-f321-4f08-bce6-88db85477e1e/





----
??: 
   "user-zh"

https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ==amp;mid=2247490197amp;idx=1amp;sn=b0893a9bf12fbcae76852a156302de95
 state 
ttlmanaged 
memory




 --nbsp;nbsp;--
 ??:

 "user-zh"

 <
 louke...@gmail.comgt;;
 :nbsp;2021??2??5??(??) 2:03
 ??:nbsp;"user-zh"https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ==amp;amp;mid=2247490197amp;amp;;idx=1amp;amp;sn=b0893a9bf12fbcae76852a156302de95
 gt
 


Re: FlinkKafka Consumer can't dynamic discover the partition update

2021-02-17 Thread 张云云
感谢回复!

我是这样配置的

Properties properties = new Properties();
properties.setProperty("bootstrap.servers",
ConfigureManager.getString(CmConfigConstants.SOURCE_KAFKA_SERVERS,
null, kafkaName));
properties.setProperty("group.id",
ConfigureManager.getString(CmConfigConstants.SOURCE_KAFKA_GROUPID,
null, kafkaName));
properties.setProperty("flink.partition-discovery.interval-millis", "5000");

String[] topics = ConfigureManager
.getArrayString(CmConfigConstants.SOURCE_KAFKA_TOPICS, ",", null,
kafkaName);
Preconditions.checkNotNull(topics, "topics can't be null");
FlinkKafkaConsumer011 kafkaConsumer = new FlinkKafkaConsumer011<>(
Arrays.asList(topics),
new SimpleStringSchema(), properties);


增加分区之后没有效果,程序日志中没有任何检测信息

数据是有写入的,不过我还没确认那个分区有没有写入,后面我看下


On Thu, Feb 18, 2021 at 2:38 PM Robin Zhang 
wrote:

> Hi,张云云
> 1. flink.partition-discovery.interval-millis
> 是kafka的一个配置参数,不知道你是不是通过kafkaProp设置的
> 2. 通过shell查看topic分区是否顺利增加,并且有数据写入。
>
> Best,
> Robin
>
>
> 张云云 wrote
> > When start the job, occurs WARN log like below:
> >
> > WARN  org.apache.kafka.clients.consumer.ConsumerConfig  - The
> > configuration 'flink.partition-discovery.interval-millis' was supplied
> > but isn't a known config.
> >
> >
> >
> >
> > And I try to change the kafka partion with command, partition number from
> > 3
> > to 4
> >
> > ./kafka-topics.sh --alter --zookeeper 10.0.10.21:15311 --topic
> > STRUCTED_LOG --partitions 4
> >
> > it dosen't work.
> >
> >
> >
> > How can I do with this problem. Thanks a lot
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: blob server相关,文件找不到

2021-02-17 Thread Alex_gao
权限问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Performance issues when RocksDB block cache is full

2021-02-17 Thread Yun Tang
Hi Yaroslav,

Unfortunately, RocksDB does not have such TTL block cache, and if you really 
only have very few active keys, current LRU implementation should work well as 
only useful latest entries are inserted into cache.
What kind of behavior when cache reached the maximum? Have you ever noticed 
anything different on RocksDB metrics?
Perhaps you might meet problem of flushing write buffer too early [1] and 
partitioned index [2] might help.

[1] https://issues.apache.org/jira/browse/FLINK-19238
[2] https://issues.apache.org/jira/browse/FLINK-20496

Best
Yun Tang



From: Dawid Wysakowicz
Sent: Monday, February 15, 2021 17:55
To: Yaroslav Tkachenko; user@flink.apache.org
Cc: Yun Tang
Subject: Re: Performance issues when RocksDB block cache is full


Hey Yaroslav,

Unfortunately I don't have enough knowledge to give you an educated reply. The 
first part certainly does make sense to me, but I am not sure how to mitigate 
the issue. I am ccing Yun Tang who worked more on the RocksDB state backend (It 
might take him a while to answer though, as he is on vacation right now).

Best,

Dawid

On 14/02/2021 06:57, Yaroslav Tkachenko wrote:
Hello,

I observe throughput degradation when my pipeline reaches the maximum of the 
allocated block cache.

The pipeline is consuming from a few Kafka topics at a high rate (100k+ rec/s). 
Almost every processed message results in a (keyed) state read with an optional 
write. I've enabled native RocksDB metrics and noticed that everything stays 
stable until the block cache usage reaches maximum. If I understand correctly, 
this makes sense: this cache is used for all reads and cache misses could mean 
reading data on disk, which is much slower (I haven't switched to SSDs yet). 
Does it make sense?

One thing I know about the messages I consume: I expect very few keys to be 
active simultaneously, most of them can be treated as cold. So I'd love RocksDB 
block cache to have a TTL option (say, 30 minutes), which, I imagine, could 
solve this issue by guaranteeing to only keep active keys in memory. I don't 
feel like LRU is doing a very good job here... I couldn't find any option like 
that, but I'm wondering if someone could recommend something similar.

Thank you!

--
Yaroslav Tkachenko
sap1ens.com


Re: Flink实时统计 结果波动时大时小

2021-02-17 Thread Robin Zhang
Hi,flink2021
   首先看看业务场景,是否存在订单数据减少的情况,如果没有,就是逻辑或者代码有问题

Best,
Robin


flink2021 wrote
> 我的数据源是kafka
> 统计订单数据结果写入到mysql,发现在数据有积压的过程中,统计结果会忽大忽小?有人遇到过相关的问题没有呢?需要调整那些设置呢?(数据链路又点复杂,state
> 使用rockdb报错,没有设置过期时间)
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FlinkKafka Consumer can't dynamic discover the partition update

2021-02-17 Thread Robin Zhang
Hi,张云云
1. flink.partition-discovery.interval-millis
是kafka的一个配置参数,不知道你是不是通过kafkaProp设置的
2. 通过shell查看topic分区是否顺利增加,并且有数据写入。

Best,
Robin


张云云 wrote
> When start the job, occurs WARN log like below:
> 
> WARN  org.apache.kafka.clients.consumer.ConsumerConfig  - The
> configuration 'flink.partition-discovery.interval-millis' was supplied
> but isn't a known config.
> 
> 
> 
> 
> And I try to change the kafka partion with command, partition number from
> 3
> to 4
> 
> ./kafka-topics.sh --alter --zookeeper 10.0.10.21:15311 --topic
> STRUCTED_LOG --partitions 4
> 
> it dosen't work.
> 
> 
> 
> How can I do with this problem. Thanks a lot





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Best practices around checkpoint intervals and sizes?

2021-02-17 Thread Dan Hill
Hi.  I'm playing around with optimizing our checkpoint intervals and sizes.

Are there any best practices around this?  I have a ~7 sequential joins and
a few sinks.  I'm curious what would result in the better throughput and
latency trade offs.  I'd assume less frequent checkpointing would increase
throughput (but constrained by how frequently I want checkedpointed sinks
written).


flink cdc 遇到Heartbeat of TaskManager with id timed out

2021-02-17 Thread william
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
eaffacbed6a9d6025a362df2738d5299 timed out.
at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_171]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_171]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.11.2.jar:1.11.2]



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink cdc 遇到akka RemoteRpcInvocation 问题

2021-02-17 Thread william
报错日志:
2021-02-16 11:43:49,351 WARN  akka.remote.ReliableDeliverySupervisor
  
[] - Association with remote system [akka.tcp://flink@xx:45578] has
failed, address is now gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink@xx:45578]] Caused by: [java.net.ConnectException:
Connection refused: /xx:45578]

2021-02-16 11:43:50,431 WARN  akka.remote.transport.netty.NettyTransport
  
[] - Remote connection to [null] failed with java.net.ConnectException:
Connection refused: /xx:45578

2021-02-16 12:48:47,130 ERROR akka.remote.EndpointWriter  [] - dropping
message [class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]
for non-local recipient
[Actor[akka.tcp://flink@xx:45578/user/rpc/taskmanager_0#1283715547]]
arriving at [akka.tcp://flink@xx:45578] inbound addresses are
[akka.tcp://flink@localhost:6123]



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Adding proctime columng to table api

2021-02-17 Thread Rex Fenley
Following from that, I'm not really sure why I need to provide a proctime
timestamp. There should never be any late data with proctime, when a record
arrives it should just be put into whatever the current window is. So why
is there any requirement to specify a time column in this case?

Thanks!

On Wed, Feb 17, 2021 at 9:33 PM Rex Fenley  wrote:

> Also, as an example, I've tried
> table.window(Tumble over 1.seconds on proctime() as $"w")...
> and it failed.
>
> On Wed, Feb 17, 2021 at 9:30 PM Rex Fenley  wrote:
>
>> Hi,
>>
>> When using streaming api, if I want a tumbling window on proctime all I
>> have to do is the following:
>> table.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))...
>> I don't even need to explicitly create a proctime column.
>>
>> However, adding an intermediate tumbling window on proctime using the
>> table api has proved more difficult.
>>
>> The docs seem to possibly imply that I can only add a proctime column on
>> table creation [1] however this isn't what I want because it adds
>> complexity. I want to only render and use proctime at one intermediate
>> tumbling windowed aggregate in the entire query plan, Therefore, I don't
>> want proctime carried from the beginning of all my tables to where I
>> finally need it, I just want it where I need it. Every combination of
>> things I've tried though has seemed to have failed. Is there any way to do
>> this?
>>
>> Additionally, I don't want to switch to data streams because my tables
>> have retractions and the table api is simpler to use in that sense.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: blob server相关,文件找不到

2021-02-17 Thread Alex_gao
遇到了相同的问题,mark



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Adding proctime columng to table api

2021-02-17 Thread Rex Fenley
Also, as an example, I've tried
table.window(Tumble over 1.seconds on proctime() as $"w")...
and it failed.

On Wed, Feb 17, 2021 at 9:30 PM Rex Fenley  wrote:

> Hi,
>
> When using streaming api, if I want a tumbling window on proctime all I
> have to do is the following:
> table.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))...
> I don't even need to explicitly create a proctime column.
>
> However, adding an intermediate tumbling window on proctime using the
> table api has proved more difficult.
>
> The docs seem to possibly imply that I can only add a proctime column on
> table creation [1] however this isn't what I want because it adds
> complexity. I want to only render and use proctime at one intermediate
> tumbling windowed aggregate in the entire query plan, Therefore, I don't
> want proctime carried from the beginning of all my tables to where I
> finally need it, I just want it where I need it. Every combination of
> things I've tried though has seemed to have failed. Is there any way to do
> this?
>
> Additionally, I don't want to switch to data streams because my tables
> have retractions and the table api is simpler to use in that sense.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Adding proctime columng to table api

2021-02-17 Thread Rex Fenley
Hi,

When using streaming api, if I want a tumbling window on proctime all I
have to do is the following:
table.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))...
I don't even need to explicitly create a proctime column.

However, adding an intermediate tumbling window on proctime using the table
api has proved more difficult.

The docs seem to possibly imply that I can only add a proctime column on
table creation [1] however this isn't what I want because it adds
complexity. I want to only render and use proctime at one intermediate
tumbling windowed aggregate in the entire query plan, Therefore, I don't
want proctime carried from the beginning of all my tables to where I
finally need it, I just want it where I need it. Every combination of
things I've tried though has seemed to have failed. Is there any way to do
this?

Additionally, I don't want to switch to data streams because my tables have
retractions and the table api is simpler to use in that sense.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Using INTERVAL parameters for UDTF

2021-02-17 Thread Patrick Angeles
NVM. Found the actual source on Calcite trunk. Looks like interval type
(and a few others) are not yet supported.

https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeUtil.java



On Wed, Feb 17, 2021 at 8:11 PM Patrick Angeles 
wrote:

> For some reason I can't get view the source so I don't have exact line
> numbers, but IntelliJ was kind enough to decompile this part
> (SqlTypeUtil.class) for me. This appears to be the exception I'm hitting.
>
> if (!isAtomic(type) && !isNull(type)) {
>> if (isCollection(type)) {
>> typeNameSpec = new
>> SqlCollectionTypeNameSpec(convertTypeToSpec(type.getComponentType()).getTypeNameSpec(),
>> typeName, SqlParserPos.ZERO);
>> } else {
>> if (!isRow(type)) {
>> throw new UnsupportedOperationException("Unsupported type
>> when convertTypeToSpec: " + typeName);
>> }
>
>
> It got there because isAtomic returns false for interval types:
>
> public static boolean isAtomic(RelDataType type) {
>> SqlTypeName typeName = type.getSqlTypeName();
>> if (typeName == null) {
>> return false;
>> } else {
>> return isDatetime(type) || isNumeric(type) || isString(type) ||
>> isBoolean(type);
>> }
>> }
>
>
> Seems like a bug?
>
> On Wed, Feb 17, 2021 at 5:55 PM Patrick Angeles 
> wrote:
>
>> Wondering if anyone has seen this before, and has any suggestions. I have
>> a UDTF with the following signature:
>>
>> public void eval(LocalDateTime startTime, LocalDateTime endTime, Duration
>>> step) {
>>
>>
>> According to the docs, this should be mapped from the following SQL
>> snippet:
>>
>> ... LATERAL TABLE func(t1, t2, INTERVAL '5' MINUTES)
>>
>>
>> However, when I run a query in sql-client I get:
>>
>> Caused by: java.lang.UnsupportedOperationException: Unsupported type when
>>> convertTypeToSpec: INTERVAL_DAY_SECOND
>>> at
>>> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1059)
>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>>> at
>>> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1081)
>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>>> at
>>> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.castTo(TypeInferenceOperandChecker.java:165)
>>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>>
>>
>> I tried all sorts of DataTypeHints (including bridgedTo specification) to
>> no avail. Any pointers?
>>
>> Thanks in advance.
>>
>> - P
>>
>


FlinkKafka Consumer can't dynamic discover the partition update

2021-02-17 Thread 张云云
When start the job, occurs WARN log like below:

WARN  org.apache.kafka.clients.consumer.ConsumerConfig  - The
configuration 'flink.partition-discovery.interval-millis' was supplied
but isn't a known config.




And I try to change the kafka partion with command, partition number from 3
to 4

./kafka-topics.sh --alter --zookeeper 10.0.10.21:15311 --topic
STRUCTED_LOG --partitions 4

it dosen't work.



How can I do with this problem. Thanks a lot


Re: Native K8S HA Session Cluster Issue 1.12.1

2021-02-17 Thread Yang Wang
I second till's suggestion.

You could also build your own flink-kubernetes jar from source code of
branch 1.12. After that, bundle the
flink-kubernetes jar to the image under /opt/flink/lib directory. And push
to docker repository.

Some users come into the same issues with you and have verified the "too
old resource version" fix works well for them.


Best,
Yang

Till Rohrmann  于2021年2月12日周五 上午1:20写道:

> Hi Kevin,
>
> Unfortunately, the root cause for the error is missing. I can only guess
> but it could indeed be FLINK-20417 [1]. If this is the case, then the
> problem should be fixed with the upcoming Flink 1.12.2 version. It should
> be released next week hopefully. If it should be a different problem, then
> we will know better because Flink 1.12.2 will fix the problem with
> swallowing the root cause. So I would highly recommend upgrading once the
> next bug fix release has been released.
>
> [1] https://issues.apache.org/jira/browse/FLINK-20417
>
> Cheers,
> Till
>
> On Thu, Feb 11, 2021 at 9:21 AM Bohinski, Kevin <
> kevin_bohin...@comcast.com> wrote:
>
>> Hi All,
>>
>> On long lived session clusters we are seeing a k8s error `Error while
>> watching the ConfigMap`.
>> Good news is it looks like `too old resource version` issue is fixed :).
>>
>> Logs are attached below. Any tips?
>>
>> best
>> Kevin
>>
>>
>> 2021-02-11 07:55:15,249 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
>> checkpoint 4 for job 58ec7a029cd31ad057e25479a9979cb4 (202852094 bytes in
>> 49274 ms).
>> 2021-02-11 08:00:15,732 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
>> Triggering checkpoint 5 (type=CHECKPOINT) @ 1613030415249 for job
>> 58ec7a029cd31ad057e25479a9979cb4.
>> 2021-02-11 08:00:25,446 ERROR
>> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
>> Fatal error occurred in ResourceManager.
>> org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error
>> while watching the ConfigMap
>> JOB_NAME-6a3361c3fdeb4dd9ba80d8e667a8093e-jobmanager-leader
>> at
>> org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderRetrievalDriver.java:120)
>> [flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.kubernetes.kubeclient.resources.AbstractKubernetesWatcher.onClose(AbstractKubernetesWatcher.java:48)
>> [flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> io.fabric8.kubernetes.client.utils.WatcherToggle.onClose(WatcherToggle.java:56)
>> [flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.closeEvent(WatchConnectionManager.java:367)
>> [flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.access$700(WatchConnectionManager.java:50)
>> [flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
>> [flink-dist_2.12-1.12.1.jar:1.12.1]
>> at 
>> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
>> [flink-dist_2.12-1.12.1.jar:1.12.1]
>> at 
>> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
>> [flink-dist_2.12-1.12.1.jar:1.12.1]
>> at 
>> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
>> [flink-dist_2.12-1.12.1.jar:1.12.1]
>> at 
>> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
>> [flink-dist_2.12-1.12.1.jar:1.12.1]
>> at 
>> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
>> [flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
>> [flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
>> [flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> [?:1.8.0_282]
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> [?:1.8.0_282]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
>> 2021-02-11 08:00:25,456 ERROR
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
>> error occurred in the cluster entrypoint.
>> org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Error
>> while watching the ConfigMap
>> JOB_NAME-6a3361c3fdeb4dd9ba80d8e667a8093e-jobmanager-leader
>> at
>> org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver$ConfigMapCallbackHandlerImpl.handleFatalError(KubernetesLeaderRetrievalDriver.java:120)
>> [flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> 

Re: Flink docker in session cluster mode - is a local distribution needed?

2021-02-17 Thread Yang Wang
I am not aware of some simple solution we could use the Flink runtime jars
within the docker images, except for "docker run/exec".
So if we want to provide some easy commands to submit Flink jobs, I think
they are also a wrapper of "docker run/exec".

Best,
Yang

Till Rohrmann  于2021年2月17日周三 下午9:04写道:

> Yes, agreed. This could be better streamlined. If you wanna help with
> this, then feel free to open a JIRA issue for it.
>
> Cheers,
> Till
>
> On Wed, Feb 17, 2021 at 11:37 AM Manas Kale  wrote:
>
>> Hi Till,
>> Oh I see... I managed to do what you said using a bunch of docker exec
>> commands. However, I think this solution is quite hacky and could be
>> improved by providing some simple command to submit jobs using the Flink
>> runtime within the docker images. I believe this will achieve full
>> containerization - the host system is not at all expected to have the Flink
>> runtime, everything is within Docker images.
>>
>> Thanks a lot!
>>
>> On Tue, Feb 16, 2021 at 6:08 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Manas,
>>>
>>> I think the documentation assumes that you first start a session cluster
>>> and then submit jobs from outside the Docker images. If your jobs are
>>> included in the Docker image, then you could log into the master process
>>> and start the jobs from within the Docker image.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Feb 16, 2021 at 1:00 PM Manas Kale 
>>> wrote:
>>>
 Hi,

 I have a project that is a set of 6 jobs out of which 4 are written in
 Java and 2 are written in pyFlink. I want to dockerize these so that all 6
 can be run in a single Flink session cluster.

 I have been able to successfully set up the JobManager and TaskManager
 containers as per [1] after creating a custom Docker image that has Python.
 For the last step, the guide asks us to submit the job using a local
 distribution of Flink:

 $ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

 I am probably missing something here because I have the following
 questions:
 Why do I need to use a local distribution to submit a job?
 Why can't I use the Flink distribution that already exists within the
 images?
 How do I submit a job using the Docker image's distribution?


 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/docker.html#starting-a-session-cluster-on-docker




Re: flink k8s高可用如何使用oss作为high-availability.storageDir?

2021-02-17 Thread Yang Wang
使用社区官方镜像flink:1.12.1,你需要配置如下参数
最后两个参数是通过环境变量的方式来enable oss的plugin

high-availability.storageDir: oss://flink/flink-ha
fs.oss.endpoint: 
fs.oss.accessKeyId: 
fs.oss.accessKeySecret: 
containerized.master.env.ENABLE_BUILT_IN_PLUGINS:
flink-oss-fs-hadoop-1.12.1.jar
containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS:
flink-oss-fs-hadoop-1.12.1.jar

Best,
Yang

casel.chen  于2021年2月17日周三 下午5:42写道:

>
> 如题,在k8s环境下不想使用hdfs作为high-availability.storageDir,有没有办法直接使用oss呢?checkpoint和savepoint已经能够使用oss了。


Re: Understanding Job Manager Web UI in HA Mode

2021-02-17 Thread Yang Wang
I think you could also configure the same Persistent Volume for all the
JobManagers and mount it to /path/of/job-jars in Pod.
After that, set the config option "web.upload.dir: /path/of/job-jars". This
will make the web submission works for multiple JobManagers.

Best,
Yang

Till Rohrmann  于2021年2月16日周二 上午12:24写道:

> No, there is no need after the job has been submitted. It's only that the
> web ui based submission is a two step process where you 1) upload the jar
> and 2) submit it. If you should access between 1) and 2) a different rest
> server, then the new rest server won't know about the uploaded jar.
>
> Cheers,
> Till
>
> On Mon, Feb 15, 2021 at 11:41 AM Chirag Dewan 
> wrote:
>
>> Thanks Till, that sounds fantastic.
>>
>> Is there any need for all Job Managers to see the jar after a job is
>> running?
>>
>> I plan to sync the leader address from the config map and might always
>> end up at the leader.
>>
>> Thanks
>> Chirag
>>
>> On Monday, 15 February, 2021, 03:16:50 pm IST, Till Rohrmann <
>> trohrm...@apache.org> wrote:
>>
>>
>> Hi Chirag,
>>
>> when starting standby JobManagers, then Flink will already start a web
>> server for each process for serving REST requests. These servers will,
>> however, not necessarily ask the JobManager they have been started with but
>> always forward requests to the current leading JobManager. That way all web
>> UIs are responsive but they all query the current leader. So for querying
>> information you don't need to know which process is currently the leader.
>>
>> One thing to add is that when uploading jars for the web submission, only
>> the web server to which you uploaded the jar will see it.
>>
>> Cheers,
>> Till
>>
>> On Mon, Feb 15, 2021 at 9:38 AM Chirag Dewan 
>> wrote:
>>
>> Hi,
>>
>> We configured Job Manager HA with Kubernetes strategy and found that the
>> Web UI for all 3 Job Managers is accessible on their configured rpc
>> addresses. There's no information on the Web UI that suggests which Job
>> Manager is the leader or task managers are registered to. However, from the
>> logs I can see that Task Manager is registered with one Job Manager and if
>> it's unavailable, Task Manager can switch to standby instance.
>>
>> Having little to no experience on HA, I wanted to know if this is the
>> expected behavior. I was assuming that only the leader Web UI would be
>> accessible?
>>
>> Thanks,
>> Chirag
>>
>>


Flink实时统计 结果波动时大时小

2021-02-17 Thread flink2021
我的数据源是kafka
统计订单数据结果写入到mysql,发现在数据有积压的过程中,统计结果会忽大忽小?有人遇到过相关的问题没有呢?需要调整那些设置呢?(数据链路又点复杂,state
使用rockdb报错,没有设置过期时间)



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink实时统计 结果波动时大时小

2021-02-17 Thread flink2021
我用Zepplin 上面提交作业统计我们的订单数 结果放入mysql,数据源为kafka
,发现在kafka中数据有积压时,然后去查询结果表的数据,发现有时候时大时小,(连续的几个查询)有时是100w,下一秒可能就是50w,这里是什么原因呢?PS.flink计算的逻辑有点复杂



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Using INTERVAL parameters for UDTF

2021-02-17 Thread Patrick Angeles
For some reason I can't get view the source so I don't have exact line
numbers, but IntelliJ was kind enough to decompile this part
(SqlTypeUtil.class) for me. This appears to be the exception I'm hitting.

if (!isAtomic(type) && !isNull(type)) {
> if (isCollection(type)) {
> typeNameSpec = new
> SqlCollectionTypeNameSpec(convertTypeToSpec(type.getComponentType()).getTypeNameSpec(),
> typeName, SqlParserPos.ZERO);
> } else {
> if (!isRow(type)) {
> throw new UnsupportedOperationException("Unsupported type when
> convertTypeToSpec: " + typeName);
> }


It got there because isAtomic returns false for interval types:

public static boolean isAtomic(RelDataType type) {
> SqlTypeName typeName = type.getSqlTypeName();
> if (typeName == null) {
> return false;
> } else {
> return isDatetime(type) || isNumeric(type) || isString(type) ||
> isBoolean(type);
> }
> }


Seems like a bug?

On Wed, Feb 17, 2021 at 5:55 PM Patrick Angeles 
wrote:

> Wondering if anyone has seen this before, and has any suggestions. I have
> a UDTF with the following signature:
>
> public void eval(LocalDateTime startTime, LocalDateTime endTime, Duration
>> step) {
>
>
> According to the docs, this should be mapped from the following SQL
> snippet:
>
> ... LATERAL TABLE func(t1, t2, INTERVAL '5' MINUTES)
>
>
> However, when I run a query in sql-client I get:
>
> Caused by: java.lang.UnsupportedOperationException: Unsupported type when
>> convertTypeToSpec: INTERVAL_DAY_SECOND
>> at
>> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1059)
>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1081)
>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>> at
>> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.castTo(TypeInferenceOperandChecker.java:165)
>> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
>
>
> I tried all sorts of DataTypeHints (including bridgedTo specification) to
> no avail. Any pointers?
>
> Thanks in advance.
>
> - P
>


Using INTERVAL parameters for UDTF

2021-02-17 Thread Patrick Angeles
Wondering if anyone has seen this before, and has any suggestions. I have a
UDTF with the following signature:

public void eval(LocalDateTime startTime, LocalDateTime endTime, Duration
> step) {


According to the docs, this should be mapped from the following SQL snippet:

... LATERAL TABLE func(t1, t2, INTERVAL '5' MINUTES)


However, when I run a query in sql-client I get:

Caused by: java.lang.UnsupportedOperationException: Unsupported type when
> convertTypeToSpec: INTERVAL_DAY_SECOND
> at
> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1059)
> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.calcite.sql.type.SqlTypeUtil.convertTypeToSpec(SqlTypeUtil.java:1081)
> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.castTo(TypeInferenceOperandChecker.java:165)
> ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]


I tried all sorts of DataTypeHints (including bridgedTo specification) to
no avail. Any pointers?

Thanks in advance.

- P


Tag flink metrics to job name

2021-02-17 Thread bat man
Hello there,

I am using prometheus to push metrics to prometheus and then use grafana
for visualization. There are metrics like
- flink_taskmanager_Status_JVM_CPU_Load,
flink_taskmanager_Status_JVM_CPU_Load,
flink_taskmanager_Status_JVM_CPU_Time
etc which do not gives job_name. It is tied to an instance.
When running multiple jobs in the same yarn cluster it is possible that
different jobs have yarn containers on the same instance, in this case it
is very difficult to find out which instance has high CPU load, Memory
usage etc.

Is there a way to tag job_name to these metrics so that the metrics could
be visualized per job.

Thanks,
Hemant


Kafka SQL Connector: dropping events if more partitions then source tasks

2021-02-17 Thread Jan Oelschlegel
Hi,

i have a question regarding FlinkSQL connector for Kafka. I have 3 Kafka 
partitions and 1 Kafka SQL source connector (Parallelism 1). The data within 
the Kafka parttitons are sorted based on a event-time field, which is also my 
event-time in Flink. My Watermark is generated with a delay of 12 hours

WATERMARK FOR eventtime as eventtime - INTERVAL '12' HOUR


But the problem is that I see dropping events due arriving late in Prometheus.  
But with parallelism of 3  there are no drops.

Do I always have to have as much source-tasks as I have Kafka partitions?



Best,
Jan
HINWEIS: Dies ist eine vertrauliche Nachricht und nur f?r den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zug?nglich zu machen. Sollten Sie diese Nachricht irrt?mlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.


Which type serializer is being used?

2021-02-17 Thread Sudharsan R
Hi,
I would like to find out what types are being serialized with which
serializer. Is there an easy way to get this information?
We have the following situation:
We have two types T1 and T2. The input to a window process function is a
Either. Both T1 and T2 themselves are POJOs. We added a field F of
type Boolean(the boxed type) to T2. We then restored a job from a
savepoint. It was my understanding that the POJO serializer would
initialize T2.F to Boolean.False. However, T2.F is null.

Thanks
Sudharsan


Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Salva Alcántara
Good to know Kezhu, many thanks again!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to report metric based on keyed state piece

2021-02-17 Thread Salva Alcántara
Awesome Piotr!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to report metric based on keyed state piece

2021-02-17 Thread Salva Alcántara
Many thanks Kezhu for pointing me on that direction!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Piotr Nowojski
> Combination of `MultipleInputStreamOperator`, `BoundedMultiInput`,
`InputSeletable`, FLIP-27 source and `ChainingStrategy.HEAD_WITH_SOURCES`
> should achieve the goal and not interfering with checkpoint, but the
control side must not be bounded before FLIP-147 delivered.

H, but I think in principle you are right Kezhu. This would work right
now, if we just removed the check
inside `StreamingJobGraphGenerator#preValidate`. Or more precisely modify
the check to support `InpueSelectable` in source tasks. But that's probably
a very very narrow use case.

Piotrek

śr., 17 lut 2021 o 16:58 Kezhu Wang  napisał(a):

> Piotr is right. So just ignore my words. It is the price of going deep
> down the rabbit hole:-).
>
>
> Best,
> Kezhu Wang
>
>
> On February 17, 2021 at 23:48:30, Piotr Nowojski (pnowoj...@apache.org)
> wrote:
>
> Note^2: InputSelectable is `@PublicEvolving` API, so it can be used.
> However as Timo pointed out, it would block the checkpointing. If I
> remember correctly there is a checkState that will not allow to use
> `InputSelectable` with enabled checkpointing.
>
> Piotrek
>
> śr., 17 lut 2021 o 16:46 Kezhu Wang  napisał(a):
>
>> Hi all,
>>
>> Thanks Arvid and Timo for more candidates.
>>
>> I also think “buffering until control side ready” should be more
>> canonical in current stage of Flink.
>>
>> Timo has created FLINK-21392 for exposing user friendly data stream api
>> to block one input temporarily.
>>
>> If one really want go deep down the rabbit hole as Arvid said, I have one
>> approach from the top of my head.
>>
>> Combination of `MultipleInputStreamOperator`, `BoundedMultiInput`,
>> `InputSeletable`, FLIP-27 source and `ChainingStrategy.HEAD_WITH_SOURCES`
>> should achieve the goal and not interfering with checkpoint, but the
>> control side must not be bounded before FLIP-147 delivered.
>>
>> [1] FLINK-21392: https://issues.apache.org/jira/browse/FLINK-21392
>>
>> Best,
>> Kezhu Wang
>>
>> On February 17, 2021 at 22:58:23, Arvid Heise (ar...@apache.org) wrote:
>>
>> Note that the question is also posted on SO [1].
>>
>> [1]
>> https://stackoverflow.com/questions/66236004/connectedstreams-paused-until-control-stream-ready/
>>
>> On Wed, Feb 17, 2021 at 3:31 PM Timo Walther  wrote:
>>
>>> Hi Kezhu,
>>>
>>> `InputSelectable` is currently not exposed in the DataStream API because
>>> it might have side effects that need to be considered (e.g. are
>>> checkpoints still go through?). In any case, we don't have a good story
>>> for blocking a control stream yet. The best option is to buffer the
>>> other stream in state until the control stream is ready. You can also
>>> artifically slow down the other stream until then (e.g. by sleeping) to
>>> not buffer too much state.
>>>
>>> I hope this helps.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 17.02.21 14:35, Kezhu Wang wrote:
>>> > A combination of `BoundedMultiInput` and `InputSelectable` could help.
>>> > You could see
>>> > `org.apache.flink.table.runtime.operators.join.HashJoinOperator`
>>> > for an usage example. The control topic have not to be bounded.
>>> >
>>> > There are maybe other approaches from later responses. I could not
>>> tell
>>> > whether it is canonical or not.
>>> >
>>> > Best,
>>> > Kezhu Wang
>>> >
>>> > On February 17, 2021 at 13:03:42, Salva Alcántara
>>> > (salcantara...@gmail.com ) wrote:
>>> >
>>> >> What is the canonical way to accomplish this:
>>> >>
>>> >> >Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
>>> >> processing of the data stream until >the control stream is "ready",
>>> so to
>>> >> speak
>>> >>
>>> >> My particular use case is as follows: I have a CoFlatMap function.
>>> The
>>> >> data
>>> >> stream contains elements that need to be enriched with additional
>>> >> information (they come with some fields empty). The missing
>>> >> information is
>>> >> taken from the control stream, whose elements come through a kafka
>>> >> source.
>>> >> Essentially, what I want is to pause any processing until having read
>>> the
>>> >> full (control) topic, otherwise (at least initially) the output
>>> elements
>>> >> will not be enriched as expected.
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> Sent from:
>>> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>> >> >> >
>>>
>>>


Sharding of Operators

2021-02-17 Thread Tripathi,Vikash
Hi there,

I wanted to know how re-partitioning of keys per operator instance would happen 
when the current operator instances are scaled up or down and we are restarting 
our job from a previous savepoint which had a different number of parallel 
instances of the same operator.

My main concern is whether the re-distribution would lead to mapping of same 
keys to same operator instances as was done earlier but if this happens then 
there would be no added advantage of adding new task slots for the same 
operator because they would remain less used or not used at all if all possible 
key values have been seen earlier and if we go by the other way around of 
evenly distributing out keys (based on the hash function) to the new parallel 
slots as well, won't this cause issues in terms of processing consistent 
results based on the state of operator as was provided by previous savepoint of 
application.

Is there a guarantee given by the hash function as in attached snippet, that 
same keys which landed earlier on an operator instance will land back again to 
the same operator instance once the job is restarted with new set of 
parallelism configuration?

Thanks,
Vikash



CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024.


Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Kezhu Wang
Piotr is right. So just ignore my words. It is the price of going deep down
the rabbit hole:-).


Best,
Kezhu Wang


On February 17, 2021 at 23:48:30, Piotr Nowojski (pnowoj...@apache.org)
wrote:

Note^2: InputSelectable is `@PublicEvolving` API, so it can be used.
However as Timo pointed out, it would block the checkpointing. If I
remember correctly there is a checkState that will not allow to use
`InputSelectable` with enabled checkpointing.

Piotrek

śr., 17 lut 2021 o 16:46 Kezhu Wang  napisał(a):

> Hi all,
>
> Thanks Arvid and Timo for more candidates.
>
> I also think “buffering until control side ready” should be more canonical
> in current stage of Flink.
>
> Timo has created FLINK-21392 for exposing user friendly data stream api to
> block one input temporarily.
>
> If one really want go deep down the rabbit hole as Arvid said, I have one
> approach from the top of my head.
>
> Combination of `MultipleInputStreamOperator`, `BoundedMultiInput`,
> `InputSeletable`, FLIP-27 source and `ChainingStrategy.HEAD_WITH_SOURCES`
> should achieve the goal and not interfering with checkpoint, but the
> control side must not be bounded before FLIP-147 delivered.
>
> [1] FLINK-21392: https://issues.apache.org/jira/browse/FLINK-21392
>
> Best,
> Kezhu Wang
>
> On February 17, 2021 at 22:58:23, Arvid Heise (ar...@apache.org) wrote:
>
> Note that the question is also posted on SO [1].
>
> [1]
> https://stackoverflow.com/questions/66236004/connectedstreams-paused-until-control-stream-ready/
>
> On Wed, Feb 17, 2021 at 3:31 PM Timo Walther  wrote:
>
>> Hi Kezhu,
>>
>> `InputSelectable` is currently not exposed in the DataStream API because
>> it might have side effects that need to be considered (e.g. are
>> checkpoints still go through?). In any case, we don't have a good story
>> for blocking a control stream yet. The best option is to buffer the
>> other stream in state until the control stream is ready. You can also
>> artifically slow down the other stream until then (e.g. by sleeping) to
>> not buffer too much state.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>>
>> On 17.02.21 14:35, Kezhu Wang wrote:
>> > A combination of `BoundedMultiInput` and `InputSelectable` could help.
>> > You could see
>> > `org.apache.flink.table.runtime.operators.join.HashJoinOperator`
>> > for an usage example. The control topic have not to be bounded.
>> >
>> > There are maybe other approaches from later responses. I could not tell
>> > whether it is canonical or not.
>> >
>> > Best,
>> > Kezhu Wang
>> >
>> > On February 17, 2021 at 13:03:42, Salva Alcántara
>> > (salcantara...@gmail.com ) wrote:
>> >
>> >> What is the canonical way to accomplish this:
>> >>
>> >> >Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
>> >> processing of the data stream until >the control stream is "ready", so
>> to
>> >> speak
>> >>
>> >> My particular use case is as follows: I have a CoFlatMap function. The
>> >> data
>> >> stream contains elements that need to be enriched with additional
>> >> information (they come with some fields empty). The missing
>> >> information is
>> >> taken from the control stream, whose elements come through a kafka
>> >> source.
>> >> Essentially, what I want is to pause any processing until having read
>> the
>> >> full (control) topic, otherwise (at least initially) the output
>> elements
>> >> will not be enriched as expected.
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from:
>> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> >> 
>>
>>


Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Piotr Nowojski
Note^2: InputSelectable is `@PublicEvolving` API, so it can be used.
However as Timo pointed out, it would block the checkpointing. If I
remember correctly there is a checkState that will not allow to use
`InputSelectable` with enabled checkpointing.

Piotrek

śr., 17 lut 2021 o 16:46 Kezhu Wang  napisał(a):

> Hi all,
>
> Thanks Arvid and Timo for more candidates.
>
> I also think “buffering until control side ready” should be more canonical
> in current stage of Flink.
>
> Timo has created FLINK-21392 for exposing user friendly data stream api to
> block one input temporarily.
>
> If one really want go deep down the rabbit hole as Arvid said, I have one
> approach from the top of my head.
>
> Combination of `MultipleInputStreamOperator`, `BoundedMultiInput`,
> `InputSeletable`, FLIP-27 source and `ChainingStrategy.HEAD_WITH_SOURCES`
> should achieve the goal and not interfering with checkpoint, but the
> control side must not be bounded before FLIP-147 delivered.
>
> [1] FLINK-21392: https://issues.apache.org/jira/browse/FLINK-21392
>
> Best,
> Kezhu Wang
>
> On February 17, 2021 at 22:58:23, Arvid Heise (ar...@apache.org) wrote:
>
> Note that the question is also posted on SO [1].
>
> [1]
> https://stackoverflow.com/questions/66236004/connectedstreams-paused-until-control-stream-ready/
>
> On Wed, Feb 17, 2021 at 3:31 PM Timo Walther  wrote:
>
>> Hi Kezhu,
>>
>> `InputSelectable` is currently not exposed in the DataStream API because
>> it might have side effects that need to be considered (e.g. are
>> checkpoints still go through?). In any case, we don't have a good story
>> for blocking a control stream yet. The best option is to buffer the
>> other stream in state until the control stream is ready. You can also
>> artifically slow down the other stream until then (e.g. by sleeping) to
>> not buffer too much state.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>>
>> On 17.02.21 14:35, Kezhu Wang wrote:
>> > A combination of `BoundedMultiInput` and `InputSelectable` could help.
>> > You could see
>> > `org.apache.flink.table.runtime.operators.join.HashJoinOperator`
>> > for an usage example. The control topic have not to be bounded.
>> >
>> > There are maybe other approaches from later responses. I could not tell
>> > whether it is canonical or not.
>> >
>> > Best,
>> > Kezhu Wang
>> >
>> > On February 17, 2021 at 13:03:42, Salva Alcántara
>> > (salcantara...@gmail.com ) wrote:
>> >
>> >> What is the canonical way to accomplish this:
>> >>
>> >> >Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
>> >> processing of the data stream until >the control stream is "ready", so
>> to
>> >> speak
>> >>
>> >> My particular use case is as follows: I have a CoFlatMap function. The
>> >> data
>> >> stream contains elements that need to be enriched with additional
>> >> information (they come with some fields empty). The missing
>> >> information is
>> >> taken from the control stream, whose elements come through a kafka
>> >> source.
>> >> Essentially, what I want is to pause any processing until having read
>> the
>> >> full (control) topic, otherwise (at least initially) the output
>> elements
>> >> will not be enriched as expected.
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from:
>> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> >> 
>>
>>


Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Kezhu Wang
Hi all,

Thanks Arvid and Timo for more candidates.

I also think “buffering until control side ready” should be more canonical
in current stage of Flink.

Timo has created FLINK-21392 for exposing user friendly data stream api to
block one input temporarily.

If one really want go deep down the rabbit hole as Arvid said, I have one
approach from the top of my head.

Combination of `MultipleInputStreamOperator`, `BoundedMultiInput`,
`InputSeletable`, FLIP-27 source and `ChainingStrategy.HEAD_WITH_SOURCES`
should achieve the goal and not interfering with checkpoint, but the
control side must not be bounded before FLIP-147 delivered.

[1] FLINK-21392: https://issues.apache.org/jira/browse/FLINK-21392

Best,
Kezhu Wang

On February 17, 2021 at 22:58:23, Arvid Heise (ar...@apache.org) wrote:

Note that the question is also posted on SO [1].

[1]
https://stackoverflow.com/questions/66236004/connectedstreams-paused-until-control-stream-ready/

On Wed, Feb 17, 2021 at 3:31 PM Timo Walther  wrote:

> Hi Kezhu,
>
> `InputSelectable` is currently not exposed in the DataStream API because
> it might have side effects that need to be considered (e.g. are
> checkpoints still go through?). In any case, we don't have a good story
> for blocking a control stream yet. The best option is to buffer the
> other stream in state until the control stream is ready. You can also
> artifically slow down the other stream until then (e.g. by sleeping) to
> not buffer too much state.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 17.02.21 14:35, Kezhu Wang wrote:
> > A combination of `BoundedMultiInput` and `InputSelectable` could help.
> > You could see
> > `org.apache.flink.table.runtime.operators.join.HashJoinOperator`
> > for an usage example. The control topic have not to be bounded.
> >
> > There are maybe other approaches from later responses. I could not tell
> > whether it is canonical or not.
> >
> > Best,
> > Kezhu Wang
> >
> > On February 17, 2021 at 13:03:42, Salva Alcántara
> > (salcantara...@gmail.com ) wrote:
> >
> >> What is the canonical way to accomplish this:
> >>
> >> >Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
> >> processing of the data stream until >the control stream is "ready", so
> to
> >> speak
> >>
> >> My particular use case is as follows: I have a CoFlatMap function. The
> >> data
> >> stream contains elements that need to be enriched with additional
> >> information (they come with some fields empty). The missing
> >> information is
> >> taken from the control stream, whose elements come through a kafka
> >> source.
> >> Essentially, what I want is to pause any processing until having read
> the
> >> full (control) topic, otherwise (at least initially) the output elements
> >> will not be enriched as expected.
> >>
> >>
> >>
> >> --
> >> Sent from:
> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >> 
>
>


Re: How to report metric based on keyed state piece

2021-02-17 Thread Piotr Nowojski
Hi Salva,

I'm not sure, but I think you can not access the state (especially the
keyed state) from within the metric, as metrics are being evaluated outside
of the keyed context, and also from another thread. Also things like
`ValueState`/`MapState` are not exposing any size.

So probably you would have to follow Kezhu's suggestion. Whenever you are
updating your state value, you can also update a shared variable to track
the combined size (`AtomicLong`?). Upon recovery you would need to
reinitialize it (maybe indeed `KeyedStateBackend.applyToAllKeys`).

Piotrek



śr., 17 lut 2021 o 14:13 Kezhu Wang  napisał(a):

> With an initial `y`, I think you could compute new `y` on new stream
> value. Upon recovering from checkpoint, may be
> `KeyedStateBackend.applyToAllKeys` could help you to rebuild an initial `y`.
>
> Best,
> Kezhu Wang
>
> On February 17, 2021 at 13:09:39, Salva Alcántara (salcantara...@gmail.com)
> wrote:
>
> I wonder what is the canonical way to accomplish the following:
>
> Given a Flink UDF, how to report a metric `y` which is a function of some
> (keyed) state `X`? That is, `y=f(X)`. Most commonly, we are interested in
> the size of the state X.
>
> For instance, consider a `CoFlatMap` function with:
>
> - `X` being a `MapState`
> - `y` (the metric) consisting of the aggregated size (i.e., the total size
> of the `MapState`, for all keys)
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: How do i register a streaming table sink in 1.12?

2021-02-17 Thread Till Rohrmann
Great to hear that it is now working and thanks for letting the community
know :-)

On Wed, Feb 17, 2021 at 2:48 PM Clay Teeter  wrote:

> Yep, that was it!  thanks! And to complete the thread, this is the working
> revision.
>
> package com.maalka.flink.sinks
>
> import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
> import com.typesafe.scalalogging.LazyLogging
> import org.apache.flink.api.common.functions.RuntimeContext
> import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, 
> JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
> import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, 
> JdbcBatchingOutputFormat}
> import 
> org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
> import 
> org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
> import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
> import org.apache.flink.streaming.api.functions.sink.SinkFunction
> import org.apache.flink.streaming.api.scala._
> import com.maalka.flink.typeInformation.Implicits._
>
> import java.sql.PreparedStatement
> import java.util.function.Function
>
> object TestSink extends LazyLogging {
>   // START HERE
>   def process(messageStream: DataStream[MaalkaDataRecord],
>   signableUpdateStream: Option[DataStream[SignableUpdate]],
>   streamExecutionEnvironment: StreamExecutionEnvironment): Unit = 
> {
>
> insertAnalyticData("raw",
>   "insert into analytic_table ... ",
>   messageStream.map(_ => "A"))
>   }
>
>   // it is required that you explicitly create a new JDBCStatementBuilder
>   val statementBuilder: JdbcStatementBuilder[String] =
> new JdbcStatementBuilder[String] {
>   override def accept(ps: PreparedStatement, t: String): Unit = {
> ps.setString(1, t)
>   }
> }
>
>
>   private def insertAnalyticData(
>   interval: String,
>   insertStatement: String,
>   messageStream: DataStream[String]): Unit = {
> val connectionString = s"jdbc:postgresql://localhost/db"
> val sink: SinkFunction[String] = JdbcSink.sink(
>   insertStatement,
>   statementBuilder,
>   JdbcExecutionOptions.builder()
> .withBatchIntervalMs(1000)
> .withBatchSize(1000)
> .withMaxRetries(10)
> .build,
>   JdbcOptions.builder()
> .setDBUrl(connectionString)
> .setTableName("analytic_table")
> .build
> )
>
> messageStream
>   .addSink(sink)
>   }
> }
>
>
>
>
> On Wed, Feb 17, 2021 at 2:24 PM Till Rohrmann 
> wrote:
>
>> I am not 100% sure but maybe (_, _) => {} captures a reference to object
>> TestSink which is not serializable. Maybe try to simply define a no
>> op JdbcStatementBuilder and pass such an instance to JdbcSink.sink().
>>
>> Cheers,
>> Till
>>
>> On Wed, Feb 17, 2021 at 2:06 PM Clay Teeter 
>> wrote:
>>
>>> Ok, this is about as simple as I can get.
>>>
>>> package com.maalka.flink.sinks
>>>
>>> import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
>>> import com.typesafe.scalalogging.LazyLogging
>>> import org.apache.flink.api.common.functions.RuntimeContext
>>> import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, 
>>> JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
>>> import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, 
>>> JdbcBatchingOutputFormat}
>>> import 
>>> org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
>>> import 
>>> org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
>>> import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
>>> import org.apache.flink.streaming.api.functions.sink.SinkFunction
>>> import org.apache.flink.streaming.api.scala._
>>> import com.maalka.flink.typeInformation.Implicits._
>>>
>>> import java.util.function.Function
>>>
>>> object TestSink extends LazyLogging {
>>>   // START HERE
>>>   def process(messageStream: DataStream[MaalkaDataRecord],
>>>   signableUpdateStream: Option[DataStream[SignableUpdate]],
>>>   streamExecutionEnvironment: StreamExecutionEnvironment): Unit 
>>> = {
>>>
>>> insertAnalyticData("raw",
>>>   "insert into analytic_table ... ",
>>>   messageStream.map(_ => "A"))
>>>   }
>>>
>>>   private def insertAnalyticData(
>>>   interval: String,
>>>   insertStatement: String,
>>>   messageStream: DataStream[String]): Unit = {
>>> val connectionString = s"jdbc:postgresql://localhost/db"
>>> val sink: SinkFunction[String] = JdbcSink.sink(
>>>   insertStatement,
>>>   (_, _) => {},// I have a feeling that this is the lambda that 
>>> can't serialize
>>>   JdbcExecutionOptions.builder()
>>> .withBatchIntervalMs(1000)
>>> .withBatchSize(1000)
>>> .withMaxRetries(10)
>>> 

Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Arvid Heise
Note that the question is also posted on SO [1].

[1]
https://stackoverflow.com/questions/66236004/connectedstreams-paused-until-control-stream-ready/

On Wed, Feb 17, 2021 at 3:31 PM Timo Walther  wrote:

> Hi Kezhu,
>
> `InputSelectable` is currently not exposed in the DataStream API because
> it might have side effects that need to be considered (e.g. are
> checkpoints still go through?). In any case, we don't have a good story
> for blocking a control stream yet. The best option is to buffer the
> other stream in state until the control stream is ready. You can also
> artifically slow down the other stream until then (e.g. by sleeping) to
> not buffer too much state.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 17.02.21 14:35, Kezhu Wang wrote:
> > A combination of `BoundedMultiInput` and `InputSelectable` could help.
> > You could see
> > `org.apache.flink.table.runtime.operators.join.HashJoinOperator`
> > for an usage example. The control topic have not to be bounded.
> >
> > There are maybe other approaches from later responses. I could not tell
> > whether it is canonical or not.
> >
> > Best,
> > Kezhu Wang
> >
> > On February 17, 2021 at 13:03:42, Salva Alcántara
> > (salcantara...@gmail.com ) wrote:
> >
> >> What is the canonical way to accomplish this:
> >>
> >> >Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
> >> processing of the data stream until >the control stream is "ready", so
> to
> >> speak
> >>
> >> My particular use case is as follows: I have a CoFlatMap function. The
> >> data
> >> stream contains elements that need to be enriched with additional
> >> information (they come with some fields empty). The missing
> >> information is
> >> taken from the control stream, whose elements come through a kafka
> >> source.
> >> Essentially, what I want is to pause any processing until having read
> the
> >> full (control) topic, otherwise (at least initially) the output elements
> >> will not be enriched as expected.
> >>
> >>
> >>
> >> --
> >> Sent from:
> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >> 
>
>


Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Timo Walther

Hi Kezhu,

`InputSelectable` is currently not exposed in the DataStream API because 
it might have side effects that need to be considered (e.g. are 
checkpoints still go through?). In any case, we don't have a good story 
for blocking a control stream yet. The best option is to buffer the 
other stream in state until the control stream is ready. You can also 
artifically slow down the other stream until then (e.g. by sleeping) to 
not buffer too much state.


I hope this helps.

Regards,
Timo


On 17.02.21 14:35, Kezhu Wang wrote:
A combination of `BoundedMultiInput` and `InputSelectable` could help. 
You could see 
`org.apache.flink.table.runtime.operators.join.HashJoinOperator`

for an usage example. The control topic have not to be bounded.

There are maybe other approaches from later responses. I could not tell 
whether it is canonical or not.


Best,
Kezhu Wang

On February 17, 2021 at 13:03:42, Salva Alcántara 
(salcantara...@gmail.com ) wrote:



What is the canonical way to accomplish this:

>Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
processing of the data stream until >the control stream is "ready", so to
speak

My particular use case is as follows: I have a CoFlatMap function. The 
data

stream contains elements that need to be enriched with additional
information (they come with some fields empty). The missing 
information is
taken from the control stream, whose elements come through a kafka 
source.

Essentially, what I want is to pause any processing until having read the
full (control) topic, otherwise (at least initially) the output elements
will not be enriched as expected.



--
Sent from: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 





Re: How do i register a streaming table sink in 1.12?

2021-02-17 Thread Clay Teeter
Yep, that was it!  thanks! And to complete the thread, this is the working
revision.

package com.maalka.flink.sinks

import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions,
JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction,
JdbcBatchingOutputFormat}
import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import com.maalka.flink.typeInformation.Implicits._

import java.sql.PreparedStatement
import java.util.function.Function

object TestSink extends LazyLogging {
  // START HERE
  def process(messageStream: DataStream[MaalkaDataRecord],
  signableUpdateStream: Option[DataStream[SignableUpdate]],
  streamExecutionEnvironment: StreamExecutionEnvironment): Unit = {

insertAnalyticData("raw",
  "insert into analytic_table ... ",
  messageStream.map(_ => "A"))
  }

  // it is required that you explicitly create a new JDBCStatementBuilder
  val statementBuilder: JdbcStatementBuilder[String] =
new JdbcStatementBuilder[String] {
  override def accept(ps: PreparedStatement, t: String): Unit = {
ps.setString(1, t)
  }
}


  private def insertAnalyticData(
  interval: String,
  insertStatement: String,
  messageStream: DataStream[String]): Unit = {
val connectionString = s"jdbc:postgresql://localhost/db"
val sink: SinkFunction[String] = JdbcSink.sink(
  insertStatement,
  statementBuilder,
  JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000)
.withBatchSize(1000)
.withMaxRetries(10)
.build,
  JdbcOptions.builder()
.setDBUrl(connectionString)
.setTableName("analytic_table")
.build
)

messageStream
  .addSink(sink)
  }
}




On Wed, Feb 17, 2021 at 2:24 PM Till Rohrmann  wrote:

> I am not 100% sure but maybe (_, _) => {} captures a reference to object
> TestSink which is not serializable. Maybe try to simply define a no
> op JdbcStatementBuilder and pass such an instance to JdbcSink.sink().
>
> Cheers,
> Till
>
> On Wed, Feb 17, 2021 at 2:06 PM Clay Teeter 
> wrote:
>
>> Ok, this is about as simple as I can get.
>>
>> package com.maalka.flink.sinks
>>
>> import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
>> import com.typesafe.scalalogging.LazyLogging
>> import org.apache.flink.api.common.functions.RuntimeContext
>> import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, 
>> JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
>> import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, 
>> JdbcBatchingOutputFormat}
>> import 
>> org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
>> import 
>> org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
>> import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
>> import org.apache.flink.streaming.api.functions.sink.SinkFunction
>> import org.apache.flink.streaming.api.scala._
>> import com.maalka.flink.typeInformation.Implicits._
>>
>> import java.util.function.Function
>>
>> object TestSink extends LazyLogging {
>>   // START HERE
>>   def process(messageStream: DataStream[MaalkaDataRecord],
>>   signableUpdateStream: Option[DataStream[SignableUpdate]],
>>   streamExecutionEnvironment: StreamExecutionEnvironment): Unit 
>> = {
>>
>> insertAnalyticData("raw",
>>   "insert into analytic_table ... ",
>>   messageStream.map(_ => "A"))
>>   }
>>
>>   private def insertAnalyticData(
>>   interval: String,
>>   insertStatement: String,
>>   messageStream: DataStream[String]): Unit = {
>> val connectionString = s"jdbc:postgresql://localhost/db"
>> val sink: SinkFunction[String] = JdbcSink.sink(
>>   insertStatement,
>>   (_, _) => {},// I have a feeling that this is the lambda that 
>> can't serialize
>>   JdbcExecutionOptions.builder()
>> .withBatchIntervalMs(1000)
>> .withBatchSize(1000)
>> .withMaxRetries(10)
>> .build,
>>   JdbcOptions.builder()
>> .setDBUrl(connectionString)
>> .setTableName("analytic_table")
>> .build
>> )
>>
>> messageStream
>>   .addSink(sink)
>>   }
>> }
>>
>>
>> On Wed, Feb 17, 2021 at 9:34 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Clay,
>>>
>>> could you maybe share the source code of

Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Kezhu Wang
A combination of `BoundedMultiInput` and `InputSelectable` could help. You
could see `org.apache.flink.table.runtime.operators.join.HashJoinOperator`
for an usage example. The control topic have not to be bounded.

There are maybe other approaches from later responses. I could not tell
whether it is canonical or not.

Best,
Kezhu Wang

On February 17, 2021 at 13:03:42, Salva Alcántara (salcantara...@gmail.com)
wrote:

What is the canonical way to accomplish this:

>Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
processing of the data stream until >the control stream is "ready", so to
speak

My particular use case is as follows: I have a CoFlatMap function. The data
stream contains elements that need to be enriched with additional
information (they come with some fields empty). The missing information is
taken from the control stream, whose elements come through a kafka source.
Essentially, what I want is to pause any processing until having read the
full (control) topic, otherwise (at least initially) the output elements
will not be enriched as expected.



-- 
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How do i register a streaming table sink in 1.12?

2021-02-17 Thread Till Rohrmann
I am not 100% sure but maybe (_, _) => {} captures a reference to object
TestSink which is not serializable. Maybe try to simply define a no
op JdbcStatementBuilder and pass such an instance to JdbcSink.sink().

Cheers,
Till

On Wed, Feb 17, 2021 at 2:06 PM Clay Teeter  wrote:

> Ok, this is about as simple as I can get.
>
> package com.maalka.flink.sinks
>
> import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
> import com.typesafe.scalalogging.LazyLogging
> import org.apache.flink.api.common.functions.RuntimeContext
> import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, 
> JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
> import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, 
> JdbcBatchingOutputFormat}
> import 
> org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
> import 
> org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
> import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
> import org.apache.flink.streaming.api.functions.sink.SinkFunction
> import org.apache.flink.streaming.api.scala._
> import com.maalka.flink.typeInformation.Implicits._
>
> import java.util.function.Function
>
> object TestSink extends LazyLogging {
>   // START HERE
>   def process(messageStream: DataStream[MaalkaDataRecord],
>   signableUpdateStream: Option[DataStream[SignableUpdate]],
>   streamExecutionEnvironment: StreamExecutionEnvironment): Unit = 
> {
>
> insertAnalyticData("raw",
>   "insert into analytic_table ... ",
>   messageStream.map(_ => "A"))
>   }
>
>   private def insertAnalyticData(
>   interval: String,
>   insertStatement: String,
>   messageStream: DataStream[String]): Unit = {
> val connectionString = s"jdbc:postgresql://localhost/db"
> val sink: SinkFunction[String] = JdbcSink.sink(
>   insertStatement,
>   (_, _) => {},// I have a feeling that this is the lambda that can't 
> serialize
>   JdbcExecutionOptions.builder()
> .withBatchIntervalMs(1000)
> .withBatchSize(1000)
> .withMaxRetries(10)
> .build,
>   JdbcOptions.builder()
> .setDBUrl(connectionString)
> .setTableName("analytic_table")
> .build
> )
>
> messageStream
>   .addSink(sink)
>   }
> }
>
>
> On Wed, Feb 17, 2021 at 9:34 AM Till Rohrmann 
> wrote:
>
>> Hi Clay,
>>
>> could you maybe share the source code of
>> com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this
>> sink uses a lambda which is not serializable. Maybe it holds a reference to
>> some non Serializable class as part of its closure.
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 16, 2021 at 8:58 PM Clay Teeter 
>> wrote:
>>
>>> Thanks Till, the tickets and links were immensely useful.  With that i
>>> was able to make progress and even get things to compile.  However, when i
>>> run things a serializable exception is thrown. (see below)
>>>
>>> .addSink(JdbcSink.sink[SignableTableSchema](
   addIntervalToInsertStatement(insertStatement, interval),
   (ps: PreparedStatement, rd: SignableTableSchema) => {
 ps.setString(1, rd.data_processing_id)
 ps.setTimestamp(2, rd.crc)
 ps.setString(3, rd.command)
 ps.setString(4, rd.result)
 ps.setOptionalString(5, rd.message)
 ps.setString(6, rd.arguments)
 ps.setOptionalString(7, rd.validatorUUID)
   },
   getJdbcExecutionOptions,
   getJdbcOptions(interval, insertStatement) // <-- This is line 376
 ))

  Where i set the executionOptions to behave in a bachfull way.
>>>
>>> def getJdbcExecutionOptions: JdbcExecutionOptions = {
>>>   JdbcExecutionOptions.builder()
>>> .withBatchIntervalMs(1000)
>>> .withBatchSize(1000)
>>> .withMaxRetries(10)
>>> .build
>>> }
>>>
>>>
>>> Any suggestions?
>>>
>>> [info]   org.apache.flink.api.common.InvalidProgramException: The
 implementation of the AbstractJdbcOutputFormat is not serializable. The
 object probably contains or references non serializable fields.
 [info]   at
 org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
 [info]   at
 org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
 [info]   at
 org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
 [info]   at
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
 [info]   at
 org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
 [info]   at
 org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
 [info]   at
 org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110)
 [info]   at
 

Re: How to report metric based on keyed state piece

2021-02-17 Thread Kezhu Wang
With an initial `y`, I think you could compute new `y` on new stream value.
Upon recovering from checkpoint, may be `KeyedStateBackend.applyToAllKeys`
could help you to rebuild an initial `y`.

Best,
Kezhu Wang

On February 17, 2021 at 13:09:39, Salva Alcántara (salcantara...@gmail.com)
wrote:

I wonder what is the canonical way to accomplish the following:

Given a Flink UDF, how to report a metric `y` which is a function of some
(keyed) state `X`? That is, `y=f(X)`. Most commonly, we are interested in
the size of the state X.

For instance, consider a `CoFlatMap` function with:

- `X` being a `MapState`
- `y` (the metric) consisting of the aggregated size (i.e., the total size
of the `MapState`, for all keys)



-- 
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How do i register a streaming table sink in 1.12?

2021-02-17 Thread Clay Teeter
Ok, this is about as simple as I can get.

package com.maalka.flink.sinks

import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions,
JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction,
JdbcBatchingOutputFormat}
import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import com.maalka.flink.typeInformation.Implicits._

import java.util.function.Function

object TestSink extends LazyLogging {
  // START HERE
  def process(messageStream: DataStream[MaalkaDataRecord],
  signableUpdateStream: Option[DataStream[SignableUpdate]],
  streamExecutionEnvironment: StreamExecutionEnvironment): Unit = {

insertAnalyticData("raw",
  "insert into analytic_table ... ",
  messageStream.map(_ => "A"))
  }

  private def insertAnalyticData(
  interval: String,
  insertStatement: String,
  messageStream: DataStream[String]): Unit = {
val connectionString = s"jdbc:postgresql://localhost/db"
val sink: SinkFunction[String] = JdbcSink.sink(
  insertStatement,
  (_, _) => {},// I have a feeling that this is the lambda
that can't serialize
  JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000)
.withBatchSize(1000)
.withMaxRetries(10)
.build,
  JdbcOptions.builder()
.setDBUrl(connectionString)
.setTableName("analytic_table")
.build
)

messageStream
  .addSink(sink)
  }
}


On Wed, Feb 17, 2021 at 9:34 AM Till Rohrmann  wrote:

> Hi Clay,
>
> could you maybe share the source code of
> com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this
> sink uses a lambda which is not serializable. Maybe it holds a reference to
> some non Serializable class as part of its closure.
>
> Cheers,
> Till
>
> On Tue, Feb 16, 2021 at 8:58 PM Clay Teeter 
> wrote:
>
>> Thanks Till, the tickets and links were immensely useful.  With that i
>> was able to make progress and even get things to compile.  However, when i
>> run things a serializable exception is thrown. (see below)
>>
>> .addSink(JdbcSink.sink[SignableTableSchema](
>>>   addIntervalToInsertStatement(insertStatement, interval),
>>>   (ps: PreparedStatement, rd: SignableTableSchema) => {
>>> ps.setString(1, rd.data_processing_id)
>>> ps.setTimestamp(2, rd.crc)
>>> ps.setString(3, rd.command)
>>> ps.setString(4, rd.result)
>>> ps.setOptionalString(5, rd.message)
>>> ps.setString(6, rd.arguments)
>>> ps.setOptionalString(7, rd.validatorUUID)
>>>   },
>>>   getJdbcExecutionOptions,
>>>   getJdbcOptions(interval, insertStatement) // <-- This is line 376
>>> ))
>>>
>>>  Where i set the executionOptions to behave in a bachfull way.
>>
>> def getJdbcExecutionOptions: JdbcExecutionOptions = {
>>   JdbcExecutionOptions.builder()
>> .withBatchIntervalMs(1000)
>> .withBatchSize(1000)
>> .withMaxRetries(10)
>> .build
>> }
>>
>>
>> Any suggestions?
>>
>> [info]   org.apache.flink.api.common.InvalidProgramException: The
>>> implementation of the AbstractJdbcOutputFormat is not serializable. The
>>> object probably contains or references non serializable fields.
>>> [info]   at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
>>> [info]   at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
>>> [info]   at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
>>> [info]   at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
>>> [info]   at
>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
>>> [info]   at
>>> org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
>>> [info]   at
>>> org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110)
>>> [info]   at
>>> com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376)
>>> [info]   at
>>> com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262)
>>> [info]   at
>>> com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250)
>>> [info]   ...
>>> [info]   Cause: java.io.NotSerializableException: Non-serializable lambda
>>> [info]   at
>>> com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x000809678c40.writeObject(Unknown
>>> Source)
>>> [info]   at
>>> 

Re: Flink docker in session cluster mode - is a local distribution needed?

2021-02-17 Thread Till Rohrmann
Yes, agreed. This could be better streamlined. If you wanna help with this,
then feel free to open a JIRA issue for it.

Cheers,
Till

On Wed, Feb 17, 2021 at 11:37 AM Manas Kale  wrote:

> Hi Till,
> Oh I see... I managed to do what you said using a bunch of docker exec
> commands. However, I think this solution is quite hacky and could be
> improved by providing some simple command to submit jobs using the Flink
> runtime within the docker images. I believe this will achieve full
> containerization - the host system is not at all expected to have the Flink
> runtime, everything is within Docker images.
>
> Thanks a lot!
>
> On Tue, Feb 16, 2021 at 6:08 PM Till Rohrmann 
> wrote:
>
>> Hi Manas,
>>
>> I think the documentation assumes that you first start a session cluster
>> and then submit jobs from outside the Docker images. If your jobs are
>> included in the Docker image, then you could log into the master process
>> and start the jobs from within the Docker image.
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 16, 2021 at 1:00 PM Manas Kale  wrote:
>>
>>> Hi,
>>>
>>> I have a project that is a set of 6 jobs out of which 4 are written in
>>> Java and 2 are written in pyFlink. I want to dockerize these so that all 6
>>> can be run in a single Flink session cluster.
>>>
>>> I have been able to successfully set up the JobManager and TaskManager
>>> containers as per [1] after creating a custom Docker image that has Python.
>>> For the last step, the guide asks us to submit the job using a local
>>> distribution of Flink:
>>>
>>> $ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
>>>
>>> I am probably missing something here because I have the following
>>> questions:
>>> Why do I need to use a local distribution to submit a job?
>>> Why can't I use the Flink distribution that already exists within the
>>> images?
>>> How do I submit a job using the Docker image's distribution?
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/docker.html#starting-a-session-cluster-on-docker
>>>
>>>


Re: Flink docker in session cluster mode - is a local distribution needed?

2021-02-17 Thread Manas Kale
Hi Till,
Oh I see... I managed to do what you said using a bunch of docker exec
commands. However, I think this solution is quite hacky and could be
improved by providing some simple command to submit jobs using the Flink
runtime within the docker images. I believe this will achieve full
containerization - the host system is not at all expected to have the Flink
runtime, everything is within Docker images.

Thanks a lot!

On Tue, Feb 16, 2021 at 6:08 PM Till Rohrmann  wrote:

> Hi Manas,
>
> I think the documentation assumes that you first start a session cluster
> and then submit jobs from outside the Docker images. If your jobs are
> included in the Docker image, then you could log into the master process
> and start the jobs from within the Docker image.
>
> Cheers,
> Till
>
> On Tue, Feb 16, 2021 at 1:00 PM Manas Kale  wrote:
>
>> Hi,
>>
>> I have a project that is a set of 6 jobs out of which 4 are written in
>> Java and 2 are written in pyFlink. I want to dockerize these so that all 6
>> can be run in a single Flink session cluster.
>>
>> I have been able to successfully set up the JobManager and TaskManager
>> containers as per [1] after creating a custom Docker image that has Python.
>> For the last step, the guide asks us to submit the job using a local
>> distribution of Flink:
>>
>> $ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
>>
>> I am probably missing something here because I have the following
>> questions:
>> Why do I need to use a local distribution to submit a job?
>> Why can't I use the Flink distribution that already exists within the
>> images?
>> How do I submit a job using the Docker image's distribution?
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/docker.html#starting-a-session-cluster-on-docker
>>
>>


flink k8s高可用如何使用oss作为high-availability.storageDir?

2021-02-17 Thread casel.chen
如题,在k8s环境下不想使用hdfs作为high-availability.storageDir,有没有办法直接使用oss呢?checkpoint和savepoint已经能够使用oss了。

Re: Joining and windowing multiple streams using DataStream API or Table API & SQL

2021-02-17 Thread Till Rohrmann
Hi Pieter,

from the top of my head, I think the easiest way to solve this problem is
to implement your own "window join" operation by first unioning all three
streams and then applying a ProcessWindowFunction similar to

allEvents.keyBy((KeySelector) value ->
value).window(SlidingEventTimeWindows.of(Time.seconds(10),
Time.seconds(5))).process(
  new ProcessWindowFunction() {
  @Override
  public void process(
  Tuple tuple,
  Context context,
  Iterable elements,
  Collector out) throws Exception {
  // compute join result from elements
  }
  });

@Timo is there an easier way using Flink's SQL or Table API?

Cheers,
Till

On Tue, Feb 16, 2021 at 3:36 PM Pieter Bonte  wrote:

> Hi all,
>
> I’m trying to apply a window operator over multiple streams (more than 2)
> and join these streams within the validity of the window. However, I have
> some questions about the time semantics using both the DataStream API and
> the Table API/SQL.
>
> Lets say we have 3 streams, an A, B and C stream. And currently we have an
> A@0 (an A at timestamp 0), a B@5 and two C’s: C@6 and C@13.
> We would like to join these streams when they fall within a sliding window
> of size 10 and slide 5.
> Thus the first window W1[0,10) should contain A@0, B@5 and C@6.
> The second window W2[5,15) should contain B@5, C@5 and C@13.
> So only in the first window we could successfully join all 3 streams.
>
> However, I’m not able to mimic this behaviour using the DataStream or
> Table API.
>
>
> Using the DataStream API, joining multiple streams can be achieved by
> applying a first window and join stream A and stream B and then apply a
> second window to join the result of the previous window with stream C, e.g.:
>
> streamA
>   .join(streamB)
> .where().equalTo()
> .window(SlidingEventTimeWindows(Time.seconds(10),Time.seconds(5)))
> //<-Window Wab
> .apply (new JoinFunction () {...})
>   .join(streamC)
> .where().equalTo()
> .window(SlidingEventTimeWindows(Time.seconds(10),Time.seconds(5)))
> //<-Window Wabc
> .apply (new JoinFunction () {…})
>
> However, according to the documentation on Window Joins [1] (and
> debugging), the joined events from the first window (Wab) will be assigned
> a new timestamp that is the largest timestamp that still lies in the
> respective window, i.e. the time the window closes.
> Thus the result of joining A@0 and B@5 over the first window (Wab) will
> be AB@9. When joining with the C-stream, AB@9 can be joined with both C@5
> and C@13. Which is not the behaviour I would like to obtain, since A
> happend at timestamp 0, and C@13 is more than 10 timestamps away.
>
> Using the Table API or SQL, I think this can be solved using Interval
> Joins [2]. However, it seems like the windowing semantics are different as
> you need to define one table(or stream) around which you want to apply a
> interval. Depending on the choice of table on which the interval is
> applied, different results can be obtained. For example, lets say we have 3
> table versions of our previous streams, i.e. A, B and C, each with a time
> attribute ’ts’.
> Applying an interval around table A would result in something like:
>
> SELECT A.a, B.b, C.c
>   FROM A, B, C
>   WHERE A.x = B.x AND A.x = C.x AND
> A.ts BETWEEN B.ts - INTERVAL ‘5' MINUTE AND B.ts + INTERVAL ‘5'
> MINUTE AND
> A.ts BETWEEN C.ts - INTERVAL ‘5' MINUTE AND C.ts + INTERVAL ‘5'
> MINUTE
>
> So if we want a window of 10, I think we split the interval in 5 minutes
> before and after? However, now A@0 is not in the interval of C@6.
> Applying a interval of 10 would solve this problem, However  if we would
> apply an interval of 10 both before and after, but chose to fix the
> interval around B instead, we run into a different problem:
>
> SELECT A.a, B.b, C.c
>   FROM A, B, C
>   WHERE A.x = B.x AND A.x = C.x AND
> B.ts BETWEEN A.ts - INTERVAL ‘10' MINUTE AND A.ts + INTERVAL ‘10'
> MINUTE AND
> B.ts BETWEEN C.ts - INTERVAL ‘10' MINUTE AND C.ts + INTERVAL ‘10’
> MINUTE
> In this case B@5 is in the interval of A@0 but also of C@13.
>
> So my question is how can I join multiple streams within a window that
> would represent the behaviour as all the streams were joined in the same
> window? Should I write my own WindowOperator that assigns the smallest
> timestamp when two events can be joined instead of the time that the window
> closes?
>
> Thanks in advance!
>
> Kind regards,
> Pieter
>
> // code examples taken from [3].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/joining.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html
> [3] https://stackoverflow.com/a/50879029
> -
> Dr. Ir. Pieter Bonte
> Ghent University - imec
> IDLab
> iGent Tower - Department of Information Technology
> 

Re: 回复: DataStream problem

2021-02-17 Thread Dawid Wysakowicz
I am sure you can achieve that with a ProcessFunction[1]

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/process_function.html#process-function

On 16/02/2021 07:28, ?g???U?[ wrote:
> Hi Dawid
>
> ?0?2 ?0?2 For example, if user 001 takes an order and generates an order
> message 1, I need to monitor if user 001 takes another order and
> generates a new order message 2 within 10 minutes. If user 001
> produces an order message 2 again within 10 minutes, I need to mark
> true in the 2 message and output it
>
>
> --?0?2?0?2--
> *??:* "Dawid Wysakowicz" ;
> *:*?0?22021??2??15??(??) 6:59
> *??:*?0?2"?g???U?[";"user";
> *:*?0?2Re: DataStream problem
>
> Hi Jiazhi,
>
> Could you elaborate what exactly do you want to achieve? What have you
> tried so far?
>
> Best,
>
> Dawid
>
> On 15/02/2021 11:11, ?g???U?[ wrote:
> > Hi all
> > ?0?2 ?0?2 ?0?2 ?0?2Using DataStream, How to implement a message and the same
> > message appears again 10 minutes later?
> > Thanks,
> > Jiazhi
>
>


signature.asc
Description: OpenPGP digital signature


Re: Configure classes

2021-02-17 Thread Till Rohrmann
Hi Abhinav,

out of the box Flink does not support what you are asking for. If you want
to minimize the amount of Flink code to write, then I would recommend
looking at Flink's SQL API [1]. For more advanced injection logic I think
you have to write a bit of tooling on your own.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql/

Cheers,
Till

On Wed, Feb 17, 2021 at 3:58 AM Abhinav Sharma 
wrote:

> Hi
> I am evaluating flink with use case where we need to create a basic flink
> pipeline, and inject the classes for map, reduce, process, etc via some xml
> configuration (or something equivalent).
> Eg:
>
> stream.keyBy(value -> value.getKey())
> .window(TumblingProcessingWindow.of(Time.miuntes(1)))
> .process(new MyInjectedClass());
>
> Is something like this possible, where a developer can just write
> MyInjectedClass, and configure it without writing code in flink app? The
> developer needs to write just the process class, and specify which step in
> pipeline to inject the class.
>


Re: How do i register a streaming table sink in 1.12?

2021-02-17 Thread Till Rohrmann
Hi Clay,

could you maybe share the source code of
com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this
sink uses a lambda which is not serializable. Maybe it holds a reference to
some non Serializable class as part of its closure.

Cheers,
Till

On Tue, Feb 16, 2021 at 8:58 PM Clay Teeter  wrote:

> Thanks Till, the tickets and links were immensely useful.  With that i was
> able to make progress and even get things to compile.  However, when i run
> things a serializable exception is thrown. (see below)
>
> .addSink(JdbcSink.sink[SignableTableSchema](
>>   addIntervalToInsertStatement(insertStatement, interval),
>>   (ps: PreparedStatement, rd: SignableTableSchema) => {
>> ps.setString(1, rd.data_processing_id)
>> ps.setTimestamp(2, rd.crc)
>> ps.setString(3, rd.command)
>> ps.setString(4, rd.result)
>> ps.setOptionalString(5, rd.message)
>> ps.setString(6, rd.arguments)
>> ps.setOptionalString(7, rd.validatorUUID)
>>   },
>>   getJdbcExecutionOptions,
>>   getJdbcOptions(interval, insertStatement) // <-- This is line 376
>> ))
>>
>>  Where i set the executionOptions to behave in a bachfull way.
>
> def getJdbcExecutionOptions: JdbcExecutionOptions = {
>   JdbcExecutionOptions.builder()
> .withBatchIntervalMs(1000)
> .withBatchSize(1000)
> .withMaxRetries(10)
> .build
> }
>
>
> Any suggestions?
>
> [info]   org.apache.flink.api.common.InvalidProgramException: The
>> implementation of the AbstractJdbcOutputFormat is not serializable. The
>> object probably contains or references non serializable fields.
>> [info]   at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
>> [info]   at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
>> [info]   at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
>> [info]   at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
>> [info]   at
>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
>> [info]   at
>> org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
>> [info]   at
>> org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110)
>> [info]   at
>> com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376)
>> [info]   at
>> com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262)
>> [info]   at
>> com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250)
>> [info]   ...
>> [info]   Cause: java.io.NotSerializableException: Non-serializable lambda
>> [info]   at
>> com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x000809678c40.writeObject(Unknown
>> Source)
>> [info]   at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> [info]   at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> [info]   at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> [info]   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> [info]   at
>> java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
>> [info]   at
>> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
>> [info]   at
>> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
>> [info]   at
>> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
>> [info]   at
>> java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
>>
>
>
> On Tue, Feb 16, 2021 at 6:11 PM Till Rohrmann 
> wrote:
>
>> Hi Clay,
>>
>> I am not a Table API expert but let me try to answer your question:
>>
>> With FLINK-17748 [1] the community removed the registerTableSink in
>> favour of the connect API. The connect API has been deprecated [2] because
>> it was not well maintained. Now the recommended way for specifying sinks is
>> to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on
>> how to use the DDL. Maybe Timo or Jark can point you towards a good guide
>> on how to register your jdbc table sink.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-17748
>> [2] https://issues.apache.org/jira/browse/FLINK-18416
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter 
>> wrote:
>>
>>> Hey all.  Hopefully this is an easy question.  I'm porting my JDBC
>>> postgres sink from 1.10 to 1.12
>>>
>>> I'm using:
>>> * StreamTableEnvironment
>>> * JdbcUpsertTableSink
>>>
>>> What I'm having difficulty with is how to register the sink with the
>>> streaming table environment.
>>>
>>> In 1.10:
>>>
>>> tableEnv.registerTableSink(