Re: Deduplicating record amplification

2021-01-27 Thread Rex Fenley
Ok, that sounds like it confirms my expectations.

So I tried running my above code and had to slightly edit to using java
Tuple2 because our execution environment stuff is all in Java.

class CompactionAggregate
extends AggregateFunction[
Tuple2[java.lang.Boolean, Row],
Tuple2[java.lang.Boolean, Row],
Tuple2[java.lang.Boolean, Row]
] {

override def createAccumulator() = new Tuple2(false, null)

// Just take the lastest value to compact.
override def add(
value: Tuple2[java.lang.Boolean, Row],
accumulator: Tuple2[java.lang.Boolean, Row]
) =
value

override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
accumulator

// This is a required function that we don't use.
override def merge(
a: Tuple2[java.lang.Boolean, Row],
b: Tuple2[java.lang.Boolean, Row]
) =
throw new NotImplementedException()
}

But when running I get the following error:
>Caused by: java.lang.RuntimeException: Could not extract key from
[redacted row]
>...
> Caused by: org.apache.flink.table.api.ValidationException: Unsupported
kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are
supported when converting to an expression.

I'm googling around and haven't found anything informative about what might
be causing this issue. Any ideas?

I'll also take a look at the SQL functions you suggested and see if I can
use those.

Thanks!



On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise  wrote:

> Hi Rex,
>
> if your keyby (and with join/grouping/windowing) is random or not depends
> on the relationship of the join/grouping key with your Kafka partitioning
> key.
>
> Say your partitioning key is document_id. Then, any join/grouping key that
> is composed of (or is exactly) document_id, will retain the order. You
> should always ask yourself the question: can two records coming from the
> ordered Kafka partition X be processed by two different operator instances.
> For a join/grouping operator, there is only the strict guarantee that all
> records with the same key will be shuffled into the same operator instance.
>
> Your compaction in general looks good but I'm not deep into Table API. I'm
> quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table API
> should already do what you want. [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions
>
> On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley  wrote:
>
>> In addition to those questions, assuming that keyed streams are in order,
>> I've come up with the following solution to compact our records and only
>> pick the most recent one per id before sending to the ES sink.
>>
>> The first item in the Row is the document ID / primary key which we want
>> to compact records on.
>>
>> val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
>> userDocsStream
>>   .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>>   .aggregate(new CompactionAggregate())class CompactionAggregate
>> extends AggregateFunction[
>>   (Boolean, Row),
>>   (Boolean, Row),
>>   (Boolean, Row)
>> ] {  override def createAccumulator() = (false, null)  // Just take the 
>> latest value to compact.
>>   override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
>> value  override def getResult(accumulator: (Boolean, Row)) = accumulator 
>>  // This is a required function that we don't use.
>>   override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
>> throw new NotImplementedException()
>> }
>>
>> I'm hoping that if the last record in the window is an insert it picks
>> that if it's a retract then it picks that and then when we send this to the
>> ES sink we will simply check true or false in the first element of the
>> tuple for an insert or delete request to ES. Does this seem like it will
>> work?
>>
>> Thanks!
>>
>>
>> On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley  wrote:
>>
>>> This is great info, thanks!
>>>
>>> My question then becomes, what constitutes a random shuffle? Currently
>>> we're using the Table API with minibatch on flink v1.11.3. Do our joins
>>> output a keyed stream of records by join key or is this random? I imagine
>>> that they'd have to have a key for retracts and accumulates to arrive in
>>> order on the next downstream operator. Same with aggs but on the groupBy
>>> key.
>>>
>>> Does this sound correct to you?
>>>
>>> Thanks!
>>>
>>> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise  wrote:
>>>
 Hi Rex,

 indeed these two statements look like they contradict each other, but
 they are looking at both sides from the same coin.
 Flink is simply putting records in FIFO in windows. That is, there is
 no ordering on event time if there are late events. So if your elements
 arrive ordered, the ordering is retained. If your elements arrive
 unordered, the same unordered order is retained.

 However, note that Flink can only guarantee FIFO according to your
 topology. Consider a source with 

Re: flink-1.12 通过-t指定模式后无法指定yarn参数

2021-01-27 Thread silence
flink1.12后所有的yarn相关的参数通过-D进行指定
例:-D yarn.application.name=xxx 替代以前的-ynm xxx
更多配置参考文档https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#yarn



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Deduplicating record amplification

2021-01-27 Thread Arvid Heise
Hi Rex,

if your keyby (and with join/grouping/windowing) is random or not depends
on the relationship of the join/grouping key with your Kafka partitioning
key.

Say your partitioning key is document_id. Then, any join/grouping key that
is composed of (or is exactly) document_id, will retain the order. You
should always ask yourself the question: can two records coming from the
ordered Kafka partition X be processed by two different operator instances.
For a join/grouping operator, there is only the strict guarantee that all
records with the same key will be shuffled into the same operator instance.

Your compaction in general looks good but I'm not deep into Table API. I'm
quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table API
should already do what you want. [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions

On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley  wrote:

> In addition to those questions, assuming that keyed streams are in order,
> I've come up with the following solution to compact our records and only
> pick the most recent one per id before sending to the ES sink.
>
> The first item in the Row is the document ID / primary key which we want
> to compact records on.
>
> val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
> userDocsStream
>   .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>   .aggregate(new CompactionAggregate())class CompactionAggregate
> extends AggregateFunction[
>   (Boolean, Row),
>   (Boolean, Row),
>   (Boolean, Row)
> ] {  override def createAccumulator() = (false, null)  // Just take the 
> latest value to compact.
>   override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
> value  override def getResult(accumulator: (Boolean, Row)) = accumulator  
> // This is a required function that we don't use.
>   override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
> throw new NotImplementedException()
> }
>
> I'm hoping that if the last record in the window is an insert it picks
> that if it's a retract then it picks that and then when we send this to the
> ES sink we will simply check true or false in the first element of the
> tuple for an insert or delete request to ES. Does this seem like it will
> work?
>
> Thanks!
>
>
> On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley  wrote:
>
>> This is great info, thanks!
>>
>> My question then becomes, what constitutes a random shuffle? Currently
>> we're using the Table API with minibatch on flink v1.11.3. Do our joins
>> output a keyed stream of records by join key or is this random? I imagine
>> that they'd have to have a key for retracts and accumulates to arrive in
>> order on the next downstream operator. Same with aggs but on the groupBy
>> key.
>>
>> Does this sound correct to you?
>>
>> Thanks!
>>
>> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise  wrote:
>>
>>> Hi Rex,
>>>
>>> indeed these two statements look like they contradict each other, but
>>> they are looking at both sides from the same coin.
>>> Flink is simply putting records in FIFO in windows. That is, there is no
>>> ordering on event time if there are late events. So if your elements arrive
>>> ordered, the ordering is retained. If your elements arrive unordered, the
>>> same unordered order is retained.
>>>
>>> However, note that Flink can only guarantee FIFO according to your
>>> topology. Consider a source with parallelism 2, each reading data from an
>>> ordered kafka partition (k1, k2) respectively. Each partition has records
>>> with keys, such that no key appears in both partitions (default behavior if
>>> you set keys but no partition while writing to Kafka).
>>> 1) Let's assume you do a simple transformation and write them back into
>>> kafka with the same key. Then you can be sure that the order of the records
>>> is retained.
>>>
>>> 2) Now you add a random shuffle and have the transformation. Now two
>>> successive records may be processed in parallel and there is a race
>>> condition who is written first into Kafka. So order is not retained.
>>>
>>> 3) You shuffle both partitions by the Kafka key (keyby) and do some
>>> aggregation. Two successive records with the same key will always be
>>> processed by the same aggregation operator. So the order is retained for
>>> each key (note that this is what you usually want and want Kafka gives you
>>> if you don't set the partition explicitly and just provide a key)
>>>
>>> 4) You shuffle both partitions by a different key. Then two successive
>>> Kafka records could be again calculated in parallel such that there is a
>>> race condition.
>>>
>>> Note that windows are a kind of aggregation.
>>>
>>> So Flink is never going to restore an ordering that is not there
>>> (because it's too costly and there are too many unknowns). But you can
>>> infer the guarantees by analyzing your topology.
>>>
>>> ---
>>>
>>> Please note that there is 

Re: memory tuning

2021-01-27 Thread Matthias Pohl
Thanks for sharing the logs. The configuration looks fine. Have you
analyzed the memory usage?

On Tue, Jan 26, 2021 at 5:02 PM Marco Villalobos 
wrote:

> Yes, I will do that.
>
> PRODUCTION
>
> 2021-01-26 04:03:50,804 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -
> 
> 2021-01-26 04:03:50,807 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  Starting YARN TaskExecutor runner (Version: 1.11.0,
> Scala: 2.12, Rev:d04872d, Date:2020-06-29T16:13:14+02:00)
> 2021-01-26 04:03:50,807 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  OS current user: yarn
> 2021-01-26 04:03:50,937 WARN  org.apache.hadoop.util.NativeCodeLoader
>  [] - Unable to load native-hadoop library for your
> platform... using builtin-java classes where applicable
> 2021-01-26 04:03:50,987 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  Current Hadoop/Kerberos user: hadoop
> 2021-01-26 04:03:50,987 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  JVM: OpenJDK 64-Bit Server VM - Amazon.com Inc. -
> 1.8/25.252-b09
> 2021-01-26 04:03:50,987 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  Maximum heap size: 3289 MiBytes
> 2021-01-26 04:03:50,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  JAVA_HOME: /etc/alternatives/jre
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  Hadoop version: 3.2.1
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  JVM Options:
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -Xmx3597035049
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -Xms3597035049
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -XX:MaxDirectMemorySize=880468305
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -XX:MaxMetaspaceSize=268435456
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1611280261341_0015/container_1611280261341_0015_01_04/taskmanager.log
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -Dlog4j.configuration=file:./log4j.properties
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -Dlog4j.configurationFile=file:./log4j.properties
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  Program Arguments:
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -D
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -
> taskmanager.memory.framework.off-heap.size=134217728b
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -D
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - taskmanager.memory.network.max=746250577b
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -D
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - taskmanager.memory.network.min=746250577b
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -D
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - taskmanager.memory.framework.heap.size=134217728b
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -D
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - taskmanager.memory.managed.size=2985002310b
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -D
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - taskmanager.cpu.cores=1.0
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -D
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - taskmanager.memory.task.heap.size=3462817321b
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -D
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - taskmanager.memory.task.off-heap.size=0b
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> 

关于Flink程序既有流又有SQL,提交任务到yarn之后没有水印生成

2021-01-27 Thread 花乞丐
目前我在本地执行Flink程序是可以将kafka中的数据消费到,而且可以成功写入到hive中,但是,当我提交任务到yarn之后,从Flink Web
UI看所有的sink都是 no
watermark的状态,但是去查看hdfs的文件,是成功写入数据的,但是没有提交分区到metastore和提交success文件,所以也就是水印没有作用,但是在本地可以的,怎么在yarn反而不行了!

 
代码如下所示,第一次使用Flink,是我使用的姿势不对吗:
package com.x.flink.app.incr;

import com.alibaba.otter.canal.protocol.FlatMessage;
import com.x.flink.contranst.TopicPattern;
import com.x.flink.executions.TradeOrderExecutions;
import com.x.flink.function.RowsFlatMapFunction;
import com.x.flink.schema.FlatMessageSchema;
import com.x.flink.utils.ConfigUtils;
import com.x.flink.utils.TableResolveUtils;
import com.x.flink.watermark.RowWatermarkAssigner;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;

import java.time.Duration;

/**

flink run \
-m yarn-cluster \
-ys 2 \
-yjm 2g \
-ytm 4g \
-c com.x.flink.app.incr.TradeOrderBinlogResolveApp \
-d \
/opt/tools/flink-1.12.0/x-realtime-etl-1.0-SNAPSHOT.jar

 */
public class TradeOrderBinlogResolveApp {
public static void main(String[] args) {
//获取执行环节
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并发
env.setParallelism(8);
//设置checkpoint
env.enableCheckpointing(6);
// 设置水印生产的时间间隔
env.getConfig().setAutoWatermarkInterval(200);
// 设置Flink SQL环境
EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// 创建table Env
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
tableEnvSettings);
// 设置checkpoint 模型
   
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);
// 设置checkpoint间隔
   
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMinutes(1));
// 指定catalog名称
String catalogName = "devHive";
// 创建HiveCatalog
HiveCatalog hiveCatalog = new HiveCatalog(catalogName,
"default",
ConfigUtils.HIVE_CONF_DIR,
ConfigUtils.HADOOP_CONF_DIR,
ConfigUtils.HIVE_VERSION
);
//注册 Hive Catalog
tableEnv.registerCatalog(catalogName,hiveCatalog);
//使用hive Catalog
tableEnv.useCatalog(catalogName);

//获取表格的schema信息
RowTypeInfo tradeOrderTypes =
TableResolveUtils.getRowTypeinformations("ods.trade_order_incr",tableEnv);
RowTypeInfo tradeOrderItemTypes =
TableResolveUtils.getRowTypeinformations("ods.trade_order_item_incr",tableEnv);
RowTypeInfo tradeRealDeliveryTypes =
TableResolveUtils.getRowTypeinformations("ods.trade_real_delivery_incr",tableEnv);
RowTypeInfo tradeSteelItemTypes =
TableResolveUtils.getRowTypeinformations("ods.trade_steel_item_incr",tableEnv);

//构建kafka消费者,消费非资金业务topic
FlinkKafkaConsumerBase messages = new
FlinkKafkaConsumer<>(TopicPattern.TRADE_PATTERN,
new FlatMessageSchema(),
ConfigUtils.getKafkaConfig())
.setStartFromEarliest();
//给每一条增加水印
FlinkKafkaConsumerBase messagesWaters =
messages.assignTimestampsAndWatermarks(
   
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(
new
SerializableTimestampAssigner() {
@Override
public long extractTimestamp(FlatMessage
element, long recordTimestamp) {
return element.getEs();
}
}
)
);


// 添加数据源
DataStreamSource messageSources =

Re: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-27 Thread 林影
实际生产环境下,我们这边业务要求用event time

wpp <1215303...@qq.com> 于2021年1月28日周四 下午2:54写道:

> 可以按照proceeTime来处理吧
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Difference between table.exec.source.idle-timeout and setIdleStateRetentionTime ?

2021-01-27 Thread Dcosta, Agnelo (HBO)
Hi Dawid,
Thanks for the tip on time constraint. We are using within in our 
MATCH_RECOGNIZE clause. It set to 3 minutes.
Increase in checkpoint size problem still persists.

Thanks for adding comments to FLINK-15160. I will take a look at changes you 
suggested.

P.S. :
I initially meant to ask what is the difference between
table.exec.state.ttl 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/config.html#table-exec-state-ttl

And
setIdleStateRetentionTime: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time
And if table.exec.state.ttl makes any difference to match_recognize state ?

From: Dawid Wysakowicz 
Date: Wednesday, January 27, 2021 at 12:41 AM
To: Dcosta, Agnelo (HBO) , user@flink.apache.org 

Subject: Re: Difference between table.exec.source.idle-timeout and 
setIdleStateRetentionTime ?
**External Email received from: dwysakow...@apache.org **


Hey,

As for the MATCH_RECOGNIZE clause, I highly recommend applying a time 
constraint[1]. The idle state retention time does not apply to the 
MATCH_RECOGNIZE, but you can think of the time constraint as something similar, 
but it is closer to the actual query logic.

If you are hitting FLINK-15160 unfortunately I don't have a good solution for 
it. The only thing that comes to my mind is adding a heartbeat event to the 
event stream to prune the partial matches, but I understand it is quite 
invasive.

If you would be willing to help fixing the problem in FLINK, I could also help 
review it and give pointers how it could be done.

Best,

Dawid

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/match_recognize.html#time-constraint
On 26/01/2021 17:39, Dcosta, Agnelo (HBO) wrote:
Hi Dawid, thanks for the clarification and it helps a lot.
Reply to couple of points :

what is causing the state to grow?
We are using flink SQL and have 5 pattern match queries , 3 group by tumble 
windows. State growth over time is primarily coming from pattern match queries.

Is it ever growing keyspace?
Yes. By design our keyspace is ever growing. The expectation is that messages 
for one key will come in for couple of hours, then stop coming in. We would 
never see messages from that key again. New keys are constantly coming in.

Is it that a watermark does not progress?
Watermark on the subtask level is constantly updating and is in sync with other 
subtasks. We have not seen any issues with watermark updating as such.

Looking through mailing list archive, our problem seems similar to
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-in-CEP-queries-keep-increasing-td31045.html
https://issues.apache.org/jira/browse/FLINK-15160 : Clean up is not applied if 
there are no incoming events for a key.

By design we can have partial matched states/matches in pattern match queries. 
And key space is such that no new event comes in for those partial matches.

thanks.

From: Dawid Wysakowicz 
Date: Tuesday, January 26, 2021 at 3:14 AM
To: Dcosta, Agnelo (HBO) , 
user@flink.apache.org 

Subject: Re: Difference between table.exec.source.idle-timeout and 
setIdleStateRetentionTime ?
**External Email received from: 
dwysakow...@apache.org **


Hi,

The difference is that the table.exec.source.idle-timeout is used for dealing 
with source idleness[1]. It is a problem that a watermark cannot advance if 
some of the partition become idle (do not produce any records). Watermark is 
always the minimum of watermarks of all input partitions. The setting makes 
flink ignore certain partitions in the calculation after the time threshold is 
reached.

The IdleStateRetention is Table API specific. As described in the link you 
provided it removes entries from a state for keys that were not seen for a 
given time threshold.

As for your issue, I'd recommend first investigating what is causing the state 
to grow. Is it ever growing keyspace? Is it that a watermark does not progress 
(this should manifest in results as well). Or is it something else.

Best,

Dawid



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
On 25/01/2021 20:12, Dcosta, Agnelo (HBO) wrote:

Hi,

What is the difference between table.exec.source.idle-timeout and 
setIdleStateRetentionTime ?

table.exec.source.idle-timeout: 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/config.html#table-exec-source-idle-timeout



setIdleStateRetentionTime: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time



Some context:
Hi we are using flink 1.12.
Our checkpoint size is constantly increasing once app is deployed.
After performing a restart, checkpoint size 

Re:over window丢数据

2021-01-27 Thread Appleyuchi
该问题已经解决,请忽略,谢谢














在 2021-01-28 11:42:08,"Appleyuchi"  写道:
>大佬们好!
>
>
>我在学习以下over window案例
>https://help.aliyun.com/document_detail/62514.html
>
>
>我的flink sql client完整操作如下
>https://yuchi.blog.csdn.net/article/details/113128072
>
>
>问题:
>输入8条数据,
>返回7条数据,
>请问为何丢数据?
>
>
>求助,谢谢!


回复: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-27 Thread 刘小红
可以调用WatermarkStrategy.withIdleness(Duration idleTimeout) 
指定空闲超时时间,这样不会影响水印的进度,进而影响下游算子操作


| |
刘小红
|
|
18500348...@163.com
|
签名由网易邮箱大师定制
在2021年1月28日 14:42,wpp<1215303...@qq.com> 写道:
可以按照proceeTime来处理吧



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-27 Thread wpp
可以按照proceeTime来处理吧



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于端到端的延迟监控

2021-01-27 Thread wpp
这个延迟,只是给一个参考意义吧,



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: JobManager seems to be leaking temporary jar files

2021-01-27 Thread Maciek Próchniak

Hi Chesnay,

thanks for reply. I wonder if FLINK-21164 will help without FLINK-9844 - 
if the jar file is not closed, it won't be successfully deleted?


As for FLINK-9844 - I understand that having code like

if (userClassLoader instanceof Closeable) { ((Closeable) 
userClassloader).close() }


is too "dirty trick" to be considered?


thanks,

maciek


On 27.01.2021 13:00, Chesnay Schepler wrote:
The problem of submitted jar files not being closed is a known one: 
https://issues.apache.org/jira/browse/FLINK-9844

IIRC it's not exactly trivial to fix since class-loading is involved.
It's not strictly related to the REST API; it also occurs in the CLI 
but is less noticeable since jars are usually not deleted.


As for the issue with deleteExtractedLibraries, Maciek is generally on 
a good track.
The explicit delete call is indeed missing. The best place to put is 
probably JarRunHandler#handleRequest, within handle after the job was run.

A similar issue also exists in the JarPlanHandler.

I've opened https://issues.apache.org/jira/browse/FLINK-21164 to fix 
this issue.


On 1/26/2021 12:21 PM, Maciek Próchniak wrote:


Hi Matthias,

I think the problem lies somewhere in JarRunHandler, as this is the 
place where the files are created.


I think these are not the files that are managed via BlobService, as 
they are not stored in BlobService folders (I made experiment 
changing default BlobServer folders).


It seems to me that CliFrontend deletes those files explicitly:

https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L250

whereas I couldn't find such invocation in JarRunHandler (not 
deleting those files does not fully explain leak on heap though...)



thanks,

maciek

On 26.01.2021 11:16, Matthias Pohl wrote:

Hi Maciek,
my understanding is that the jars in the JobManager should be 
cleaned up after the job is terminated (I assume that your jobs 
successfully finished). The jars are managed by the BlobService. The 
dispatcher will trigger the jobCleanup in [1] after job termination. 
Are there any suspicious log messages that might indicate an issue?

I'm adding Chesnay to this thread as he might have more insights here.

[1] 
https://github.com/apache/flink/blob/2c4e0ab921ccfaf003073ee50faeae4d4e4f4c93/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L797 



On Mon, Jan 25, 2021 at 8:37 PM Maciek Próchniak > wrote:


Hello,

in our setup we have:

- Flink 1.11.2

- job submission via REST API (first we upload jar, then we submit
multiple jobs with it)

- additional jars embedded in lib directory of main jar (this is
crucial
part)

When we submit jobs this way, Flink creates new temp jar files via
PackagedProgram.extractContainedLibraries method.

We observe that they are not removed after job finishes - it
seems that
PackagedProgram.deleteExtractedLibraries is not invoked when
using REST
API.

What's more, it seems that those jars remain open in JobManager
process.
We observe that when we delete them manually via scripts, the
disk space
is not reclaimed until process is restarted, we also see via
heap dump
inspection that java.util.zip.ZipFile$Source  objects remain,
pointing
to those files. This is quite a problem for us, as we submit
quite a few
jobs, and after a while we ran out of either heap or disk space on
JobManager process/host. Unfortunately, I cannot so far find
where this
leak would happen...

Does anybody have some pointers where we can search? Or how to
fix this
behaviour?


thanks,

maciek





关于端到端的延迟监控

2021-01-27 Thread 13051111332


Hi Everyone:
  
现在了解到的官方提供的LatencyMarker机制,不建议在生产上使用,而且也不参与算子内部逻辑,只能粗略估算出延迟,所以关于端到端的延迟监控,大家有什么更好的方案吗?





Re: 用application mode部署应用,classloader.resolve-order参数是否必须要改为parent-first?

2021-01-27 Thread lp
谢答。查看我的pom.xml文件,和打包后的压缩包,确实包含kafka(org.apache.kafka.common)的相关依赖;所以我将相关的pom中的依赖都设置为provide,然后重新打包,并确认了我打好的jar包中不包含了任何kafka的依赖,发布运行,这次jobmanager直接报错:Caused
by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
---
我怀疑是否是我的集群部署有问题,我是这样做的:
安装了3节点的hadoop(和yarn)集群【master(NameNode、SecondaryNameNode、ResourceManager),slave01(DataNode、NodeManager),slave02(DataNode、NodeManager)】,在master节点上解压缩了flink-1.12.1.tar.gz包,并且在他的lib目录下放置了hadoop的依赖jar包:flink-shaded-hadoop-2-uber-2.8.3-8.0.jar,然后直接上传我的jar包到该节点的/opt下,在flink目录下采用了如下命令发布到yarn集群以applicationMode运行:bin/flink
run-application -t yarn-application
/opt/quickstart-0.1.jar;发现在slave02上分配了jobmanager的container,里面的jobmanager.log报如上错误。
--
我之前从spark转过来的,spark on yarn 并不需要在每个节点部署,不是flink on yarn
是否也是这样的,如果不多,请教下应该是怎样的?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Deduplicating record amplification

2021-01-27 Thread Rex Fenley
In addition to those questions, assuming that keyed streams are in order,
I've come up with the following solution to compact our records and only
pick the most recent one per id before sending to the ES sink.

The first item in the Row is the document ID / primary key which we want to
compact records on.

val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
userDocsStream
  .window(TumblingEventTimeWindows.of(Time.seconds(1)))
  .aggregate(new CompactionAggregate())class CompactionAggregate
extends AggregateFunction[
  (Boolean, Row),
  (Boolean, Row),
  (Boolean, Row)
] {  override def createAccumulator() = (false, null)  // Just
take the latest value to compact.
  override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
value  override def getResult(accumulator: (Boolean, Row)) =
accumulator  // This is a required function that we don't use.
  override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
throw new NotImplementedException()
}

I'm hoping that if the last record in the window is an insert it picks that
if it's a retract then it picks that and then when we send this to the ES
sink we will simply check true or false in the first element of the tuple
for an insert or delete request to ES. Does this seem like it will work?

Thanks!


On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley  wrote:

> This is great info, thanks!
>
> My question then becomes, what constitutes a random shuffle? Currently
> we're using the Table API with minibatch on flink v1.11.3. Do our joins
> output a keyed stream of records by join key or is this random? I imagine
> that they'd have to have a key for retracts and accumulates to arrive in
> order on the next downstream operator. Same with aggs but on the groupBy
> key.
>
> Does this sound correct to you?
>
> Thanks!
>
> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise  wrote:
>
>> Hi Rex,
>>
>> indeed these two statements look like they contradict each other, but
>> they are looking at both sides from the same coin.
>> Flink is simply putting records in FIFO in windows. That is, there is no
>> ordering on event time if there are late events. So if your elements arrive
>> ordered, the ordering is retained. If your elements arrive unordered, the
>> same unordered order is retained.
>>
>> However, note that Flink can only guarantee FIFO according to your
>> topology. Consider a source with parallelism 2, each reading data from an
>> ordered kafka partition (k1, k2) respectively. Each partition has records
>> with keys, such that no key appears in both partitions (default behavior if
>> you set keys but no partition while writing to Kafka).
>> 1) Let's assume you do a simple transformation and write them back into
>> kafka with the same key. Then you can be sure that the order of the records
>> is retained.
>>
>> 2) Now you add a random shuffle and have the transformation. Now two
>> successive records may be processed in parallel and there is a race
>> condition who is written first into Kafka. So order is not retained.
>>
>> 3) You shuffle both partitions by the Kafka key (keyby) and do some
>> aggregation. Two successive records with the same key will always be
>> processed by the same aggregation operator. So the order is retained for
>> each key (note that this is what you usually want and want Kafka gives you
>> if you don't set the partition explicitly and just provide a key)
>>
>> 4) You shuffle both partitions by a different key. Then two successive
>> Kafka records could be again calculated in parallel such that there is a
>> race condition.
>>
>> Note that windows are a kind of aggregation.
>>
>> So Flink is never going to restore an ordering that is not there (because
>> it's too costly and there are too many unknowns). But you can infer the
>> guarantees by analyzing your topology.
>>
>> ---
>>
>> Please note that there is a common pitfall when you work with Kafka:
>> - Ordering of records in Kafka is only guaranteed if you set 
>> *max.in.flight.requests.per.connection
>> *to 1
>> *. [1]*
>> *- *Often you also want to set *enable.idempotence* and *acks=all*
>>
>> That is true for the upstream application and if you plan back to write
>> to Kafka you also need to set that in Flink.
>>
>> [1]
>> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
>>
>> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley  wrote:
>>
>>> Hello,
>>>
>>> I began reading
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
>>>
>>>-
>>>
>>>*> Redistributing* streams (between *map()* and *keyBy/window*, as
>>>well as between *keyBy/window* and *sink*) change the partitioning
>>>of streams. Each *operator subtask* sends data to different target
>>>subtasks, depending on the selected transformation. Examples are
>>>*keyBy()* (re-partitions by hash code), *broadcast()*, or
>>>*rebalance()* (random 

Is Flink able to parse strings into dynamic JSON?

2021-01-27 Thread Devin Bost
I'm wanting to know if it's possible in Flink to parse strings into a
dynamic JSON object that doesn't require me to know the primitive type
details at compile time.
We have over 300 event types to process, and I need a way to load the types
at runtime. I only need to know if certain fields exist on the incoming
objects, and the object schemas are all different except for certain
fields.
Every example I can find shows Flink users specifying the full type
information at compile time, but there's no way this will scale.

It's possible for us to lookup primitive type details at runtime from JSON,
but I'll still need a way to process that JSON in Flink to extract the
metadata if it's required. So, that brings me back to the original issue.

How can I do this in Flink?

--
Devin G. Bost


kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-27 Thread 林影
Hi Everyone, 有个关于watermark问题请教,
watermark是根据event的event time往前推进的,如果上游无数据,导致watermark没有前进,这种情况下应该如何处理呢?


Flink 读 Hive 表,如何设置 TTL

2021-01-27 Thread macia kk
文档上是在 create table 的时候, 设置 lookup.join.cache.ttl

但是我现在想用 streaming kafka 的数据,join 一张已经存在的 Hive 表,怎么设置TTL?

CREATE TABLE dimension_table (
  product_id STRING,
  product_name STRING,
  unit_price DECIMAL(10, 4),
  pv_count BIGINT,
  like_count BIGINT,
  comment_count BIGINT,
  update_time TIMESTAMP(3),
  update_user STRING,
  ...) TBLPROPERTIES (
  'streaming-source.enable' = 'false',   -- option with
default value, can be ignored.
  'streaming-source.partition.include' = 'all',  -- option with
default value, can be ignored.
  'lookup.join.cache.ttl' = '12 h');


咨询一下 LEFT JOIN 产生 DELETE 消息的疑惑

2021-01-27 Thread DONG, Weike
Hi 大家好,

近期在处理 LEFT JOIN 语句时,发现了一个奇怪的现象:假设有如下 SQL 语句:

CREATE TABLE A (
key INT
) WITH (
'connector' = 'kafka',
);

CREATE TABLE B (
  key INT
) WITH (
'connector' = 'kafka',
);

CREATE TABLE Sink (
id INTEGER,
upsert_value BIGINT,
primary key (`id`) NOT ENFORCED
) WITH (
'connector.type' = 'elasticsearch',
'update-mode' = 'upsert',
-- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式
);

INSERT INTO Sink
SELECT A.key, SUM(B.key)
FROM A LEFT JOIN B ON A.key = B.key
GROUP BY A.key;


用于 LEFT JOIN 的左表叫做 A,右表叫做 B,那么:

*场景 1. *如果左表 A 来了一条数据 key=100,在右表 B 中首次没有 JOIN 成功(此时 B 还没有 key=100
的数据),则会向下游 ES Sink 输出 Upsert 消息(true, 100, null)。如果过段时间之后,B 有了 key=100
的数据,此时 Flink 会发出 DELETE 消息(false, 100, null),随后再发送一条 UPSERT 消息(例如 true,
100, 100)更新下游结果。此后无论如何,再也不会输出 DELETE 消息了。

*场景 2. * 如果左表 A 来了一条数据 key=100,在右表 B 中首次 JOIN 成功(即 B 已经有 key=100 的数据)
,则不会输出 DELETE 消息,而是直接输出 Upsert 消息(true, 100, 100),此后无论如何,再也不会输出 DELETE 消息。


*问题:*

请问场景 1 中的 LEFT JOIN 输出 Delete 消息是否有必要呢?我理解直接对于场景 1,直接发出 Upsert 消息也可以,Delete
看似用途不大。而且,Delete 消息会造成对应 doc id 中的一些字段被清除(如果之前该 doc 保存有其他 Flink
表中未定义的字段的话),造成字段的意外丢失。

阅读了 GroupAggFunction 的代码,看到有如下的逻辑,请问这个设计是否可以阐述一下是为了避免什么情况呢?非常感谢 :)

[image: image.png]


Re: Converting non-mini-batch to mini-batch from checkpoint or savepoint

2021-01-27 Thread Rex Fenley
Thanks for the clarification.

On Wed, Jan 27, 2021 at 7:24 PM Jark Wu  wrote:

> Hi Rex,
>
> Currently, it is not state compatible, because we will add a new node
> called MiniBatchAssigner after the source which changes the JobGraph , thus
> uid is different.
>
> Best,
> Jark
>
> On Tue, 26 Jan 2021 at 18:33, Dawid Wysakowicz 
> wrote:
>
>> I am pulling in Jark and Godfrey who are more familiar with the internals
>> of the planner.
>> On 21/01/2021 01:43, Rex Fenley wrote:
>>
>> Just tested this and I couldn't restore from a savepoint. If I do a new
>> job from scratch, can I tune the minibatch parameters and restore from a
>> savepoint without having to make yet another brand new job?
>>
>> Thanks
>>
>>
>> On Wed, Jan 20, 2021 at 12:43 PM Rex Fenley  wrote:
>>
>>> Hello,
>>>
>>> Is it safe to convert a non-mini-batch job to a mini-batch job when
>>> restoring from a checkpoint or a savepoint?
>>>
>>> Thanks
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com  |  BLOG 
>>>  |  FOLLOW US   |  LIKE US
>>> 
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



over window丢数据

2021-01-27 Thread Appleyuchi
大佬们好!


我在学习以下over window案例
https://help.aliyun.com/document_detail/62514.html


我的flink sql client完整操作如下
https://yuchi.blog.csdn.net/article/details/113128072


问题:
输入8条数据,
返回7条数据,
请问为何丢数据?


求助,谢谢!

Re: A few questions about minibatch

2021-01-27 Thread Rex Fenley
Thanks, that all makes sense!

On Wed, Jan 27, 2021 at 7:00 PM Jark Wu  wrote:

> Hi Rex,
>
> Could you share your query here? It would be helpful to identify the root
> cause if we have the query.
>
> 1) watermark
> The framework automatically adds a node (the MiniBatchAssigner) to
> generate watermark events as the mini-batch id to broadcast and trigger
> mini-batch in the pipeline.
>
> 2) MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]
> It generates a new mini-batch id in an interval of 1000ms in system time.
> The mini-batch id is represented by the watermark event.
>
> 3) TWO_PHASE optimization
> If users want to have TWO_PHASE optimization, it requires the aggregate
> functions all support the merge() method and the mini-batch is enabled.
>
> Best,
> Jark
>
>
>
>
> On Tue, 26 Jan 2021 at 19:01, Dawid Wysakowicz 
> wrote:
>
>> I am pulling Jark and Godfrey who are more familiar with the planner
>> internals.
>>
>> Best,
>>
>> Dawid
>> On 22/01/2021 20:11, Rex Fenley wrote:
>>
>> Hello,
>>
>> Does anyone have any more information here?
>>
>> Thanks!
>>
>> On Wed, Jan 20, 2021 at 9:13 PM Rex Fenley  wrote:
>>
>>> Hi,
>>>
>>> Our job was experiencing high write amplification on aggregates so we
>>> decided to give mini-batch a go. There's a few things I've noticed that are
>>> different from our previous job and I would like some clarification.
>>>
>>> 1) Our operators now say they have Watermarks. We never explicitly added
>>> watermarks, and our state is essentially unbounded across all time since it
>>> consumes from Debezium and reshapes our database data into another store.
>>> Why does it say we have Watermarks then?
>>>
>>> 2) In our sources I see MiniBatchAssigner(interval=[1000ms],
>>> mode=[ProcTime], what does that do?
>>>
>>> 3) I don't really see anything else different yet in the shape of our
>>> plan even though we've turned on
>>> configuration.setString(
>>> "table.optimizer.agg-phase-strategy",
>>> "TWO_PHASE"
>>> )
>>> is there a way to check that this optimization is on? We use user
>>> defined aggregate functions, does it work for UDAF?
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com  |  BLOG 
>>>  |  FOLLOW US   |  LIKE US
>>> 
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: sql编译代码时超过64kb导致编译失败

2021-01-27 Thread stgztsw
这个限制不仅仅在janino这边,我们这边已经尝试去除了janino这边的64kb的限制,但是逻辑运行到jdk这边也出现了同样的限制。目前spark是通过spark.conf.set("spark.sql.codegen.wholeStage",false)来避免类似的问题,所以我觉得最好的方法是可以通过配置来限制一些SQL逻辑计划的优化,避免不同的view的逻辑合并到一起,这样就能通过将拆分sql来将逻辑拆分到不同的function中,从而避免一个类函数太大。

Caused by: java.lang.ClassFormatError: Invalid method Code length 123732 in
class file ExpressionReducer$3674
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
 at
org.codehaus.janino.ByteArrayClassLoader.findClass(ByteArrayClassLoader.java:77)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:86)
 at
org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
 at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
 at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
 at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
 at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
 at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
 ... 38 more



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Converting non-mini-batch to mini-batch from checkpoint or savepoint

2021-01-27 Thread Jark Wu
Hi Rex,

Currently, it is not state compatible, because we will add a new node
called MiniBatchAssigner after the source which changes the JobGraph , thus
uid is different.

Best,
Jark

On Tue, 26 Jan 2021 at 18:33, Dawid Wysakowicz 
wrote:

> I am pulling in Jark and Godfrey who are more familiar with the internals
> of the planner.
> On 21/01/2021 01:43, Rex Fenley wrote:
>
> Just tested this and I couldn't restore from a savepoint. If I do a new
> job from scratch, can I tune the minibatch parameters and restore from a
> savepoint without having to make yet another brand new job?
>
> Thanks
>
>
> On Wed, Jan 20, 2021 at 12:43 PM Rex Fenley  wrote:
>
>> Hello,
>>
>> Is it safe to convert a non-mini-batch job to a mini-batch job when
>> restoring from a checkpoint or a savepoint?
>>
>> Thanks
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>
>


Re: Conflicts between the JDBC and postgresql-cdc SQL connectors

2021-01-27 Thread Jark Wu
Hi Sebastián,

I think Dawid is right.

Could you share the pom file? I also tried to
package flink-connector-postgres-cdc with ServicesResourceTransformer, and
the Factory file contains

com.alibaba.ververica.cdc.connectors.postgres.table.PostgreSQLTableFactory


Best,
Jark


On Tue, 26 Jan 2021 at 21:17, Sebastián Magrí  wrote:

> Thanks a lot for looking into it Dawid,
>
> In the
> src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
> file I only see
>
> org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory
>
> Even after applying the ServicesResourceTransformer.
>
>
> On Tue, 26 Jan 2021 at 11:58, Dawid Wysakowicz 
> wrote:
>
>> Hi,
>>
>> Unfortunately I am not familiar with the packaging of
>> flink-connector-postgres-cdc. Maybe @Jark could help here?
>>
>> However, I think the problem that you cannot find the connector is caused
>> because of lack of entry in the resulting Manifest file. If there are
>> overlapping classes maven does not exclude whole dependencies, but rather
>> picks the overlapping class from one of the two. Could you check if you see
>> entries for all tables in
>> src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory.
>>
>> If not, you could try applying the ServicesResourceTransformer[1]
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
>> On 26/01/2021 12:29, Sebastián Magrí wrote:
>>
>> Hi!
>>
>> I've reported an issue with the postgresql-cdc connector apparently
>> caused by the maven shade plugin excluding either the JDBC connector or the
>> cdc connector due to overlapping classes. The issue for reference is here:
>>
>> https://github.com/ververica/flink-cdc-connectors/issues/90
>>
>> In the meantime, however, I've been trying to figure out if I can set up
>> an exclusion rule to fix this in my pom.xml file, without success.
>>
>> The `org.postgresql:postgresql` dependency is being added manually by me
>> to have a sink on a postgresql table and injected by the cdc connector
>> seemingly via its debezium connector dependency.
>>
>> Any guidance or hints I could follow would be really appreciated.
>>
>> --
>> Sebastián Ramírez Magrí
>>
>>
>
> --
> Sebastián Ramírez Magrí
>


Catalog(Kafka Connectors 的ddl)持久化到hive metastore,groupid一样的问题

2021-01-27 Thread 孙啸龙
Hi:
版本:1.12.0
DDL 语句持久化到hive metastore,
创建语句如下,
CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)
那多个应用使用的时候,groupid都是一样的,'properties.group.id' = ‘testGroup’
   这个是不是会有问题,大家是怎么处理的?


Re: Stateful Functions - accessing the state aside of normal processing

2021-01-27 Thread Tzu-Li (Gordon) Tai
Hi Stephan,

Great to hear about your experience with StateFun so far!

I think what you are looking for is a way to read StateFun checkpoints,
which are basically an immutable consistent point-in-time snapshot of all
the states across all your functions, and run some computation or simply to
explore the state values.
StateFun checkpoints are essentially adopted from Flink, so you can find
more detail about that here [1].

Currently, StateFun does provide a means for state "bootstrapping": running
a batch offline job to write and compose a StateFun checkpoint [2].
What is still missing is the "reading / analysis" side of things, to do
exactly what you described: running a separate batch offline job for
reading and processing an existing StateFun checkpoint.

Before we dive into details on how that may look like, do you think that is
what you would need?

Although I don't think we would be able to support such a feature yet since
we're currently focused on reworking the SDKs and request-reply protocol,
in any case it would be interesting to discuss if this feature would be
important for multiple users already.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-master/concepts/stateful-stream-processing.html#checkpointing
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/deployment-and-operations/state-bootstrap.html

On Wed, Jan 27, 2021 at 11:41 PM Stephan Pelikan 
wrote:

> Hi,
>
>
>
> We are trying to use Statefuns for our tool and it seems to be a good fit.
> I already adopted it and it works quite well. However, we have millions of
> different states (all the same FunctionType but different ids) and each
> state consists of several @Persisted values (values and tables). We want to
> build an administration tool for examining the crowd of states (count,
> histogram, etc.) and each state in detail (the persisted-tables and
> -values).
>
>
>
> Additionally we need some kind of dig-down functionality for finding those
> individual states. For example some of those persisted values can be used
> to categorize the crowd of states.
>
>
>
> My question now is how to achieve this. Is there a way to browse and
> examine statefuns in a read-only fashion (their ids, their persisted
> values)? How can one achieve this without duplicating status in e.g. a
> relational database?
>
>
>
> Thanks,
>
> Stephan
>
>
>
> PS: I have another questions but I will send them in separate mails to
> avoid mixing up topics.
>


importing types doesn't fix “could not find implicit value for evidence parameter of type …TypeInformation”

2021-01-27 Thread Devin Bost
I posted this problem on Stack Overflow here:
https://stackoverflow.com/questions/65930023/flink-importing-types-doesnt-fix-could-not-find-implicit-value-for-evidence

Basically, I can't even get a basic map to work like this:

object AmplitudeExample {
  def main(args: Array[String]) {
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment

val text = 
env.readTextFile("/Users/dbost/src/amplitude-flink/example-data.json")

val partitionedEvents = text
  .map(t => t)

partitionedEvents.print()
  }
}

I get:

could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[String] .map(t =>
t)

I'm using Flink 1.12.1 and Scala 2.12. What am I missing here?

Devin G. Bost


Re: A few questions about minibatch

2021-01-27 Thread Jark Wu
Hi Rex,

Could you share your query here? It would be helpful to identify the root
cause if we have the query.

1) watermark
The framework automatically adds a node (the MiniBatchAssigner) to generate
watermark events as the mini-batch id to broadcast and trigger mini-batch
in the pipeline.

2) MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]
It generates a new mini-batch id in an interval of 1000ms in system time.
The mini-batch id is represented by the watermark event.

3) TWO_PHASE optimization
If users want to have TWO_PHASE optimization, it requires the aggregate
functions all support the merge() method and the mini-batch is enabled.

Best,
Jark




On Tue, 26 Jan 2021 at 19:01, Dawid Wysakowicz 
wrote:

> I am pulling Jark and Godfrey who are more familiar with the planner
> internals.
>
> Best,
>
> Dawid
> On 22/01/2021 20:11, Rex Fenley wrote:
>
> Hello,
>
> Does anyone have any more information here?
>
> Thanks!
>
> On Wed, Jan 20, 2021 at 9:13 PM Rex Fenley  wrote:
>
>> Hi,
>>
>> Our job was experiencing high write amplification on aggregates so we
>> decided to give mini-batch a go. There's a few things I've noticed that are
>> different from our previous job and I would like some clarification.
>>
>> 1) Our operators now say they have Watermarks. We never explicitly added
>> watermarks, and our state is essentially unbounded across all time since it
>> consumes from Debezium and reshapes our database data into another store.
>> Why does it say we have Watermarks then?
>>
>> 2) In our sources I see MiniBatchAssigner(interval=[1000ms],
>> mode=[ProcTime], what does that do?
>>
>> 3) I don't really see anything else different yet in the shape of our
>> plan even though we've turned on
>> configuration.setString(
>> "table.optimizer.agg-phase-strategy",
>> "TWO_PHASE"
>> )
>> is there a way to check that this optimization is on? We use user defined
>> aggregate functions, does it work for UDAF?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>
>


Flink and Amazon EMR

2021-01-27 Thread Marco Villalobos
Just curious, has anybody had success with Amazon EMR with RocksDB and
checkpointing in S3?

That's the configuration I am trying to setup, but my system is running
more slowly than expected.


stopping with save points

2021-01-27 Thread Marco Villalobos
When I try to stop with a savepoint, I usually get the error below. I have
not been able to create a single save point. Please advise.

I am using Flink 1.11.0

Draining job "ed51084378323a7d9fb1c4c97c2657df" with a savepoint.


 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job
"ed51084378323a7d9fb1c4c97c2657df".
at
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.util.concurrent.TimeoutException
at
java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
at
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
at
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
... 6 more


我的Flink SQL 任务跑了一天5个小时就挂了,报这个错,是啥原因呢

2021-01-27 Thread nick
我是一个slot





java.util.concurrent.TimeoutException: Invocation of public abstract
java.util.concurrent.CompletableFuture
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.requestTaskBackPressure(org.apache.flink.runtime.executiongraph.ExecutionAttemptID,int,org.apache.flink.api.common.time.Time)
timed out.
at
org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.requestTaskBackPressure(RpcTaskManagerGateway.java:67)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.executiongraph.Execution.requestBackPressure(Execution.java:976)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureRequestCoordinator.requestBackPressure(BackPressureRequestCoordinator.java:156)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureRequestCoordinator.triggerBackPressureRequest(BackPressureRequestCoordinator.java:141)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl.triggerBackPressureRequestInternal(BackPressureStatsTrackerImpl.java:154)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl.getOperatorBackPressureStats(BackPressureStatsTrackerImpl.java:121)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.scheduler.SchedulerBase.requestOperatorBackPressureStats(SchedulerBase.java:837)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.jobmaster.JobMaster.requestOperatorBackPressureStats(JobMaster.java:730)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown Source) ~[?:?]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_271]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_271]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@bd-01:35498/user/rpc/taskmanager_0#-406132916]]
after [15000 ms]. Message of type
[org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical
reason for `AskTimeoutException` is that the recipient actor didn't send a
reply.
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at 

Flink 1.11.2 test cases fail with Scala 2.12.12

2021-01-27 Thread Sourabh Mokhasi
Hi,

We have several flink applications written with Flink 1.9.1 and
Scala 2.11.12 and we are in the process of upgrading to Flink 1.11.2 and
Scala 2.12.12. We are using maven to manage our application dependencies.

After updating the pom.xml file to use the upgraded versions of Scala and
Flink as mentioned above, all the unit tests written with Scalatest
3.0.5(We are using flatspec style) fail with the following exception.

 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:
java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9
  at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
  at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
  at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
  at
org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
  at
org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
  at
org.apache.flink.api.scala.typeutils.TraversableSerializer.(TraversableSerializer.scala:41)

  Cause: java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9
  at
scala.tools.nsc.transform.LambdaLift$LambdaLifter.(LambdaLift.scala:67)
  at
scala.tools.nsc.transform.LambdaLift.newTransformer(LambdaLift.scala:49)
  at scala.tools.nsc.transform.Transform$Phase.apply(Transform.scala:30)
  at scala.tools.nsc.Global$GlobalPhase.applyPhase(Global.scala:441)
  at scala.tools.nsc.Global$GlobalPhase.run(Global.scala:392)
  at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1467)
  at scala.tools.nsc.Global$Run.compileUnits(Global.scala:1451)
  at
scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.wrapInPackageAndCompile(ToolBoxFactory.scala:201)
  at
scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.compile(ToolBoxFactory.scala:256)
  at
scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.$anonfun$compile$13(ToolBoxFactory.scala:433)
  ...
  Cause: java.lang.ClassNotFoundException: scala.math.Ordering$$anon$9
  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
  at
scala.tools.nsc.transform.LambdaLift$LambdaLifter.(LambdaLift.scala:67)
[flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Stop job
leader service.
  at
scala.tools.nsc.transform.LambdaLift.newTransformer(LambdaLift.scala:49)
[flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager -
Shutting down TaskExecutorLocalStateStoresManager.
  at scala.tools.nsc.transform.Transform$Phase.apply(Transform.scala:30)
  at scala.tools.nsc.Global$GlobalPhase.applyPhase(Global.scala:441)
  at scala.tools.nsc.Global$GlobalPhase.run(Global.scala:392)
  at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1467)


The app itself compiles without issues and this can be verified by running
mvn clean package `mvn clean package -DskipTests`

In order to troubleshoot/narrow down the issue, I upgraded the flink
package from 1.9.1 to to 1.11.2 while keeping the scala version the same
i.e 2.11.12 instead of 2.12.12 and this seems to have resolved the issue.
The app compiles and the test cases pass as well.

Is this a known compatibility issue between Flink 1.11.2 and Scala 2.12.12?

Thanks,
Sourabh


Integration with Apache AirFlow

2021-01-27 Thread Flavio Pompermaier
Hello everybody,
is there any suggested way/pointer to schedule Flink jobs using Apache
AirFlow?
What I'd like to achieve is the submission (using the REST API of AirFlow)
of 2 jobs, where the second one can be executed only if the first one
succeed.

Thanks in advance
Flavio


Re: Deduplicating record amplification

2021-01-27 Thread Rex Fenley
This is great info, thanks!

My question then becomes, what constitutes a random shuffle? Currently
we're using the Table API with minibatch on flink v1.11.3. Do our joins
output a keyed stream of records by join key or is this random? I imagine
that they'd have to have a key for retracts and accumulates to arrive in
order on the next downstream operator. Same with aggs but on the groupBy
key.

Does this sound correct to you?

Thanks!

On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise  wrote:

> Hi Rex,
>
> indeed these two statements look like they contradict each other, but they
> are looking at both sides from the same coin.
> Flink is simply putting records in FIFO in windows. That is, there is no
> ordering on event time if there are late events. So if your elements arrive
> ordered, the ordering is retained. If your elements arrive unordered, the
> same unordered order is retained.
>
> However, note that Flink can only guarantee FIFO according to your
> topology. Consider a source with parallelism 2, each reading data from an
> ordered kafka partition (k1, k2) respectively. Each partition has records
> with keys, such that no key appears in both partitions (default behavior if
> you set keys but no partition while writing to Kafka).
> 1) Let's assume you do a simple transformation and write them back into
> kafka with the same key. Then you can be sure that the order of the records
> is retained.
>
> 2) Now you add a random shuffle and have the transformation. Now two
> successive records may be processed in parallel and there is a race
> condition who is written first into Kafka. So order is not retained.
>
> 3) You shuffle both partitions by the Kafka key (keyby) and do some
> aggregation. Two successive records with the same key will always be
> processed by the same aggregation operator. So the order is retained for
> each key (note that this is what you usually want and want Kafka gives you
> if you don't set the partition explicitly and just provide a key)
>
> 4) You shuffle both partitions by a different key. Then two successive
> Kafka records could be again calculated in parallel such that there is a
> race condition.
>
> Note that windows are a kind of aggregation.
>
> So Flink is never going to restore an ordering that is not there (because
> it's too costly and there are too many unknowns). But you can infer the
> guarantees by analyzing your topology.
>
> ---
>
> Please note that there is a common pitfall when you work with Kafka:
> - Ordering of records in Kafka is only guaranteed if you set 
> *max.in.flight.requests.per.connection
> *to 1
> *. [1]*
> *- *Often you also want to set *enable.idempotence* and *acks=all*
>
> That is true for the upstream application and if you plan back to write to
> Kafka you also need to set that in Flink.
>
> [1]
> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
>
> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley  wrote:
>
>> Hello,
>>
>> I began reading
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
>>
>>-
>>
>>*> Redistributing* streams (between *map()* and *keyBy/window*, as
>>well as between *keyBy/window* and *sink*) change the partitioning of
>>streams. Each *operator subtask* sends data to different target
>>subtasks, depending on the selected transformation. Examples are
>>*keyBy()* (re-partitions by hash code), *broadcast()*, or
>>*rebalance()* (random redistribution). In a *redistributing*
>>exchange, order among elements is only preserved for each pair of sending-
>>and receiving task (for example subtask[1] of *map()* and subtask[2]
>>of *keyBy/window*).
>>
>> This makes it sounds like ordering on the same partition/key is always
>> maintained. Which is exactly the ordering guarantee that I need. This seems
>> to slightly contradict the statement "Flink provides no guarantees about
>> the order of the elements within a window" for keyed state. So is it true
>> that ordering _is_ guaranteed for identical keys?
>>
>> If I'm not mistaken, the state in the TableAPI is always considered keyed
>> state for a join or aggregate. Or am I missing something?
>>
>> Thanks!
>>
>> On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley  wrote:
>>
>>> Our data arrives in order from Kafka, so we are hoping to use that same
>>> order for our processing.
>>>
>>> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley  wrote:
>>>
 Going further, if "Flink provides no guarantees about the order of the
 elements within a window" then with minibatch, which I assume uses a window
 under the hood, any aggregates that expect rows to arrive in order will
 fail to keep their consistency. Is this correct?

 On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley  wrote:

> Hello,
>
> We have a job from CDC to a large unbounded Flink plan to
> Elasticsearch.
>
> Currently, we have been relentlessly trying to reduce our record

Re: Deduplicating record amplification

2021-01-27 Thread Arvid Heise
Hi Rex,

indeed these two statements look like they contradict each other, but they
are looking at both sides from the same coin.
Flink is simply putting records in FIFO in windows. That is, there is no
ordering on event time if there are late events. So if your elements arrive
ordered, the ordering is retained. If your elements arrive unordered, the
same unordered order is retained.

However, note that Flink can only guarantee FIFO according to your
topology. Consider a source with parallelism 2, each reading data from an
ordered kafka partition (k1, k2) respectively. Each partition has records
with keys, such that no key appears in both partitions (default behavior if
you set keys but no partition while writing to Kafka).
1) Let's assume you do a simple transformation and write them back into
kafka with the same key. Then you can be sure that the order of the records
is retained.

2) Now you add a random shuffle and have the transformation. Now two
successive records may be processed in parallel and there is a race
condition who is written first into Kafka. So order is not retained.

3) You shuffle both partitions by the Kafka key (keyby) and do some
aggregation. Two successive records with the same key will always be
processed by the same aggregation operator. So the order is retained for
each key (note that this is what you usually want and want Kafka gives you
if you don't set the partition explicitly and just provide a key)

4) You shuffle both partitions by a different key. Then two successive
Kafka records could be again calculated in parallel such that there is a
race condition.

Note that windows are a kind of aggregation.

So Flink is never going to restore an ordering that is not there (because
it's too costly and there are too many unknowns). But you can infer the
guarantees by analyzing your topology.

---

Please note that there is a common pitfall when you work with Kafka:
- Ordering of records in Kafka is only guaranteed if you set
*max.in.flight.requests.per.connection
*to 1
*. [1]*
*- *Often you also want to set *enable.idempotence* and *acks=all*

That is true for the upstream application and if you plan back to write to
Kafka you also need to set that in Flink.

[1]
https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html

On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley  wrote:

> Hello,
>
> I began reading
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
>
>-
>
>*> Redistributing* streams (between *map()* and *keyBy/window*, as
>well as between *keyBy/window* and *sink*) change the partitioning of
>streams. Each *operator subtask* sends data to different target
>subtasks, depending on the selected transformation. Examples are
>*keyBy()* (re-partitions by hash code), *broadcast()*, or *rebalance()*
>(random redistribution). In a *redistributing* exchange, order among
>elements is only preserved for each pair of sending- and receiving task
>(for example subtask[1] of *map()* and subtask[2] of *keyBy/window*).
>
> This makes it sounds like ordering on the same partition/key is always
> maintained. Which is exactly the ordering guarantee that I need. This seems
> to slightly contradict the statement "Flink provides no guarantees about
> the order of the elements within a window" for keyed state. So is it true
> that ordering _is_ guaranteed for identical keys?
>
> If I'm not mistaken, the state in the TableAPI is always considered keyed
> state for a join or aggregate. Or am I missing something?
>
> Thanks!
>
> On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley  wrote:
>
>> Our data arrives in order from Kafka, so we are hoping to use that same
>> order for our processing.
>>
>> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley  wrote:
>>
>>> Going further, if "Flink provides no guarantees about the order of the
>>> elements within a window" then with minibatch, which I assume uses a window
>>> under the hood, any aggregates that expect rows to arrive in order will
>>> fail to keep their consistency. Is this correct?
>>>
>>> On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley  wrote:
>>>
 Hello,

 We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

 Currently, we have been relentlessly trying to reduce our record
 amplification which, when our Elasticsearch index is near fully populated,
 completely bottlenecks our write performance. We decided recently to try a
 new job using mini-batch. At first this seemed promising but at some point
 we began getting huge record amplification in a join operator. It appears
 that minibatch may only batch on aggregate operators?

 So we're now thinking that we should have a window before our ES sink
 which only takes the last record for any unique document id in the window,
 since that's all we really want to send anyway. However, when investigating
 turning a table, to a 

Flink running in k8s pod - pod is able to access S3 bucket, flink does not

2021-01-27 Thread Oran Shuster
So i'm really stumped on this for a couple of days now
Some general info - 
Flink version 1.12.1, using k8s HA service. The k8s is self managed on AWS
our checkpoints and savepoints are on s3, i created a new bucket just for it 
and set the proper permissions to the k8s node

The job manager is working, i can access the UI and upload a job. Looking at 
the startup logs i can see the bucket i set with no errors

2021-01-27 14:46:38,740 INFO  org.apache.flink.runtime.blob.FileSystemBlobStore 
   [] - Creating highly available BLOB storage directory at 
s3://ha-storage/default/blob

(while there is no error, i can't find that directory in the bucket)


However, once i submit the job i get an exception. Looking at the job manager 
logs im getting S3 access denied

2021-01-27 14:28:08,628 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection   [] - PUT operation 
failed
java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: Access 
Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request 
ID: 8W0N0T2R4P8P7YBT; S3 Extended Request ID: 
B6zBzIoBmzNoJ4bWQE9Ydt65+IN8pyHeJQuTc28AscyG0dSEM3G7WZHutOT2scJ/6WCoOuRi27A=; 
Proxy: null), S3 Extended Request ID: 
B6zBzIoBmzNoJ4bWQE9Ydt65+IN8pyHeJQuTc28AscyG0dSEM3G7WZHutOT2scJ/6WCoOuRi27A=

So i created a new image based on the flink image with the aws cli installed 
and tried doing some s3 actions from the flink user through the shell

flink@flink-jobmanager-1-12-f6cf4b5b6-xmkvb:~$ aws s3 ls s3://
flink@flink-jobmanager-1-12-f6cf4b5b6-xmkvb:~$ touch oran.txt
flink@flink-jobmanager-1-12-f6cf4b5b6-xmkvb:~$ aws s3 cp oran.txt 
s3:///oran.txt
upload: ./oran.txt to s3://houzz-flink-1-12-session-cluster/oran.txt

Some more information - we already have an older version of flink running on 
the same cluster/namespace (version 1.9.1) and it also uses s3 (a different 
bucket) and it's working. we used a homebrewed image for that version but it is 
closely based on how the original flink image is created (no funny buisness)

Also, the s3 plugin im using is flink-s3-fs-presto-1.12.1.jar using the 
ENABLE_BUILT_IN_PLUGINS env variable. i tried using the hadoop one but got an 
error message it's missing, not sure what's up with that.

totally working... and here i'm stuck. This makes 0 sense to me so i thought i 
should ask in the mailing list
Thanks for all the help




How to maintain output order of events by execution initiation time.

2021-01-27 Thread narasimha
Hi,

Below is my dataflow

DataStream stream ...

stream.process(new ProcessFunction())
.sink(...)


class ProcessFunction ...{
MapState time ...;

processElement(...){
//add Element to Mapstate by eventtime
// register eventime+60 seconds
}

// Reason for maintaining onTimer in the process function is to update
the timer period.
onTime(timestamp, ...) {
// Iterate over the last n elements added to business objects at a
given timestamp.
// Emit to output if condition passes
}
}


BusinessObject {
List entities;

addEntity(BusinessEntities b);
}


This all looks good on code, but when processing elements with rate of 2k,
the execution result emissions order is changing, i.e,
results of time T+t, are getting emitted first than results of T.

Can someone give suggestions on how this can be handled, so that the
results emissions order is guaranteed.

Would popping them out and emitting using Sliding window of 1 sec would
solve this?


Thanks,
Narasimha

-- 
A.Narasimha Swamy


TaskManager crash. Zookeeper timeout

2021-01-27 Thread Colletta, Edward
Using flink 11.2 on java 11, session cluster with 16 jobs running on aws ecs 
instances.  Cluster has 3 JMs and 3 TMs, separate zookeeper cluster has 3 nodes.

One of our taskmanagers crashed today with what seems to be rooted in a 
zookeeper timeout.   We are wondering if there is any tuning that might cause 
this timeout.  Any help will be greatly appreciated.

The first sign of trouble in the log is the following:

2021-01-27 11:16:39,795 WARN  
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Client 
session timed out, have not heard from server in 34951ms for sessionid 
0x140c01570036
2021-01-27 11:16:39,795 INFO  
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Client 
session timed out, have not heard from server in 34951ms for sessionid 
0x140c01570036, closing socket connection and attempting reconnect
2021-01-27 11:16:39,897 INFO  
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
 [] - State change: SUSPENDED
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 7613291aea3f4892a0deed0e7036e229 with leader id 
8959b1fb00fdd4e3d28daade48204e1f lost leadership.
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 3230dacf7fa0b8b8f9fe1c77ebdde2bb with leader id 
bccda87aa8ab14f23e98a4b6d2bf4081 lost leadership.
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 8f2ee940006ebb6d8f6d12e3db917da3 with leader id 
b72d64c2ec112d96cc3b93697d85478d lost leadership.
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job aaec26e3924e81c12bd5a6d71f6c0d77 with leader id 
8d91fefd14539d11d60a16e0e5cd45b1 lost leadership.
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 2d5f912867ff70a58638aff51c7f6f33 with leader id 
b24724d3e03bee3486fdc5dc616b4a9c lost leadership.
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 29eb631a7a07aa6b2c0224972b9937bb with leader id 
8479de79b7eda73fca6593da93c04027 lost leadership.
2021-01-27 11:16:39,970 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,970 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job bc7688332e73f330f08c95428630b99e with leader id 
a541d5eb3b60d29afc3a16cab2f742e7 lost leadership.
2021-01-27 11:16:39,970 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,970 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,970 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job a70b0023b705c39fa66f47f1a666b65d with leader id 
a0bfc94c9ff40689a7143396cafe4ac7 lost leadership.
2021-01-27 11:16:39,970 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,970 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 4c929f573971b8520a76ee1dfe5c3e35 with leader id 
922675f382f87225300696bae21841cc lost leadership.
2021-01-27 11:16:39,970 WARN  

Re: Timers not firing until stream end

2021-01-27 Thread Chesnay Schepler

Actually, if the parallelism is 1 then it works as it should. sigh

On 1/27/2021 6:52 PM, Chesnay Schepler wrote:
Note that while this does fix the issue of timers not firing while the 
job is running, it seems to be firing too many timers...


On 1/27/2021 6:49 PM, Chesnay Schepler wrote:
My bad, I was still using the custom WatermarkStrategy that emits a 
watermark for each event.
.assignTimestampsAndWatermarks( new WatermarkStrategy() { 
@Override public WatermarkGenerator 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) 
{ return new 
BoundedOutOfOrdernessWatermarks(Duration.ofSeconds(1)) { @Override 
public void onEvent(Tevent, long eventTimestamp, WatermarkOutput 
output) { super.onEvent(event, eventTimestamp, output); 
super.onPeriodicEmit(output); } }; } } .withTimestampAssigner(...)


@Aljoscha This is about Flink 1.11. Since the periodic watermarks are 
dependent on processing time, am I assuming correctly if the job 
finishes quickly that watermarks may never be emitted (except for 
those at the job)? Is there any way to emit periodic watermarks based 
on event time?
Is there any way to enable punctuated watermarks for the existing 
watermark strategies without having to implement a custom one?


On 1/27/2021 5:57 PM, Pilgrim Beart wrote:

Chesnay,
Thanks for this - I've made the change you suggested 
(setAutoWatermarkInterval) but it hasn't changed the behaviour - 
timers still get processed only on stream end.
I have pushed a new version, with this change, and also emitting 
some information in a .log field.
If you search for "!!!" in Ingest.java and DPTimeoutFunction.java 
you'll see the relevant changes.


In DPTimeoutFunction you'll see that if I add code to say "cancel 
the timer only if it wouldn't have gone off" then the output is now 
correct - individual devices do timeout. However, this output only 
appears at the end of the stream (i.e. time jumps backwards as all 
the timers are processed) so I still appear not to be seeing timer 
processing at the correct event time. If there was no end of stream, 
I would never get any timeouts.


Below is the output I get when I run. This output is correct but:
a) only because I am manually cancelling timers in DPTimeoutFunction 
(search for "!!!")
b) the timer events are timestamped correctly, but are not emitted 
into the stream at the right time - and if the stream didn't end 
then no timeouts would ever occur (which in particular means that 
devices that never come back online will never get marked as offline).


Perhaps I do need to implement an onPeriodicEmit function? Does that 
require a customer watermark strategy? I can see how to define a 
custom watermark at link below, but unclear how to install that?
https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy 



{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 
0 msg_in.ts 1000 Cancelling previous timer. "}
{"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 
0 msg_in.ts 1000 Cancelling previous timer. "}
{"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 
1000 msg_in.ts 2000 Cancelling previous timer. "}
{"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 
1000 msg_in.ts 2000 Cancelling previous timer. "}
{"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 
2000 msg_in.ts 3000 Cancelling previous timer. "}
{"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 
2000 msg_in.ts 3000 Cancelling previous timer. "}
{"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 
3000 msg_in.ts 4000 Cancelling previous timer. "}
{"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 
3000 msg_in.ts 4000 Cancelling previous timer. "}
{"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 
4000 msg_in.ts 5000 Cancelling previous timer. "}
{"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 
0 msg_in.ts 5000 "}
{"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 
5000 msg_in.ts 6000 Cancelling previous timer. "}
{"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 
5000 msg_in.ts 6000 Cancelling previous timer. "}
{"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 
6000 msg_in.ts 7000 Cancelling previous timer. "}
{"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 
4000 msg_in.ts 7000 "}
{"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 
6000 msg_in.ts 7000 Cancelling previous timer. "}
{"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 
7000 msg_in.ts 8000 Cancelling 

Re: Timers not firing until stream end

2021-01-27 Thread Chesnay Schepler
Note that while this does fix the issue of timers not firing while the 
job is running, it seems to be firing too many timers...


On 1/27/2021 6:49 PM, Chesnay Schepler wrote:
My bad, I was still using the custom WatermarkStrategy that emits a 
watermark for each event.
.assignTimestampsAndWatermarks( new WatermarkStrategy() { @Override 
public WatermarkGenerator 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { 
return new BoundedOutOfOrdernessWatermarks(Duration.ofSeconds(1)) { 
@Override public void onEvent(Tevent, long eventTimestamp, 
WatermarkOutput output) { super.onEvent(event, eventTimestamp, 
output); super.onPeriodicEmit(output); } }; } } .withTimestampAssigner(...)


@Aljoscha This is about Flink 1.11. Since the periodic watermarks are 
dependent on processing time, am I assuming correctly if the job 
finishes quickly that watermarks may never be emitted (except for 
those at the job)? Is there any way to emit periodic watermarks based 
on event time?
Is there any way to enable punctuated watermarks for the existing 
watermark strategies without having to implement a custom one?


On 1/27/2021 5:57 PM, Pilgrim Beart wrote:

Chesnay,
Thanks for this - I've made the change you suggested 
(setAutoWatermarkInterval) but it hasn't changed the behaviour - 
timers still get processed only on stream end.
I have pushed a new version, with this change, and also emitting some 
information in a .log field.
If you search for "!!!" in Ingest.java and DPTimeoutFunction.java 
you'll see the relevant changes.


In DPTimeoutFunction you'll see that if I add code to say "cancel the 
timer only if it wouldn't have gone off" then the output is now 
correct - individual devices do timeout. However, this output only 
appears at the end of the stream (i.e. time jumps backwards as all 
the timers are processed) so I still appear not to be seeing timer 
processing at the correct event time. If there was no end of stream, 
I would never get any timeouts.


Below is the output I get when I run. This output is correct but:
a) only because I am manually cancelling timers in DPTimeoutFunction 
(search for "!!!")
b) the timer events are timestamped correctly, but are not emitted 
into the stream at the right time - and if the stream didn't end then 
no timeouts would ever occur (which in particular means that devices 
that never come back online will never get marked as offline).


Perhaps I do need to implement an onPeriodicEmit function? Does that 
require a customer watermark strategy? I can see how to define a 
custom watermark at link below, but unclear how to install that?
https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy 



{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0 
msg_in.ts 1000 Cancelling previous timer. "}
{"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0 
msg_in.ts 1000 Cancelling previous timer. "}
{"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 
1000 msg_in.ts 2000 Cancelling previous timer. "}
{"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 
1000 msg_in.ts 2000 Cancelling previous timer. "}
{"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 
2000 msg_in.ts 3000 Cancelling previous timer. "}
{"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 
2000 msg_in.ts 3000 Cancelling previous timer. "}
{"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 
3000 msg_in.ts 4000 Cancelling previous timer. "}
{"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 
3000 msg_in.ts 4000 Cancelling previous timer. "}
{"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 
4000 msg_in.ts 5000 Cancelling previous timer. "}
{"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0 
msg_in.ts 5000 "}
{"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 
5000 msg_in.ts 6000 Cancelling previous timer. "}
{"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 
5000 msg_in.ts 6000 Cancelling previous timer. "}
{"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 
6000 msg_in.ts 7000 Cancelling previous timer. "}
{"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 
4000 msg_in.ts 7000 "}
{"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 
6000 msg_in.ts 7000 Cancelling previous timer. "}
{"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 
7000 msg_in.ts 8000 Cancelling previous timer. "}
{"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 
7000 msg_in.ts 8000 Cancelling 

Re: Timers not firing until stream end

2021-01-27 Thread Chesnay Schepler
My bad, I was still using the custom WatermarkStrategy that emits a 
watermark for each event.


.assignTimestampsAndWatermarks( new WatermarkStrategy() { @Override 
public WatermarkGenerator 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { 
return new BoundedOutOfOrdernessWatermarks(Duration.ofSeconds(1)) { 
@Override public void onEvent(Tevent, long eventTimestamp, 
WatermarkOutput output) { super.onEvent(event, eventTimestamp, output); 
super.onPeriodicEmit(output); } }; } } .withTimestampAssigner(...)



@Aljoscha This is about Flink 1.11. Since the periodic watermarks are 
dependent on processing time, am I assuming correctly if the job 
finishes quickly that watermarks may never be emitted (except for those 
at the job)? Is there any way to emit periodic watermarks based on event 
time?
Is there any way to enable punctuated watermarks for the existing 
watermark strategies without having to implement a custom one?


On 1/27/2021 5:57 PM, Pilgrim Beart wrote:

Chesnay,
Thanks for this - I've made the change you suggested 
(setAutoWatermarkInterval) but it hasn't changed the behaviour - 
timers still get processed only on stream end.
I have pushed a new version, with this change, and also emitting some 
information in a .log field.
If you search for "!!!" in Ingest.java and DPTimeoutFunction.java 
you'll see the relevant changes.


In DPTimeoutFunction you'll see that if I add code to say "cancel the 
timer only if it wouldn't have gone off" then the output is now 
correct - individual devices do timeout. However, this output only 
appears at the end of the stream (i.e. time jumps backwards as all the 
timers are processed) so I still appear not to be seeing timer 
processing at the correct event time. If there was no end of stream, I 
would never get any timeouts.


Below is the output I get when I run. This output is correct but:
a) only because I am manually cancelling timers in DPTimeoutFunction 
(search for "!!!")
b) the timer events are timestamped correctly, but are not emitted 
into the stream at the right time - and if the stream didn't end then 
no timeouts would ever occur (which in particular means that devices 
that never come back online will never get marked as offline).


Perhaps I do need to implement an onPeriodicEmit function? Does that 
require a customer watermark strategy? I can see how to define a 
custom watermark at link below, but unclear how to install that?
https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy 



{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0 
msg_in.ts 1000 Cancelling previous timer. "}
{"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0 
msg_in.ts 1000 Cancelling previous timer. "}
{"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 
1000 msg_in.ts 2000 Cancelling previous timer. "}
{"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 
1000 msg_in.ts 2000 Cancelling previous timer. "}
{"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 
2000 msg_in.ts 3000 Cancelling previous timer. "}
{"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 
2000 msg_in.ts 3000 Cancelling previous timer. "}
{"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 
3000 msg_in.ts 4000 Cancelling previous timer. "}
{"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 
3000 msg_in.ts 4000 Cancelling previous timer. "}
{"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 
4000 msg_in.ts 5000 Cancelling previous timer. "}
{"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0 
msg_in.ts 5000 "}
{"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 
5000 msg_in.ts 6000 Cancelling previous timer. "}
{"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 
5000 msg_in.ts 6000 Cancelling previous timer. "}
{"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 
6000 msg_in.ts 7000 Cancelling previous timer. "}
{"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 
4000 msg_in.ts 7000 "}
{"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 
6000 msg_in.ts 7000 Cancelling previous timer. "}
{"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 
7000 msg_in.ts 8000 Cancelling previous timer. "}
{"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 
7000 msg_in.ts 8000 Cancelling previous timer. "}
{"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts 
7000 msg_in.ts 8000 Cancelling previous timer. "}

Re: Timers not firing until stream end

2021-01-27 Thread Pilgrim Beart
I am calling
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
I am using Flink 11.1 (because I need to run on AWS Kinesis Data Analytics).

-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot

 +44 7961 125282
See our latest features

and book me

for
a video call.



On Wed, 27 Jan 2021 at 17:07, Aljoscha Krettek  wrote:

> On 2021/01/27 15:09, Chesnay Schepler wrote:
> >Put another way, if you use any of the built-in WatermarkGenerators and
> >use event-time, then it appears that you *must* set this interval.
> >
> >This behavior is...less than ideal I must admit, and it does not
> >appear to be properly documented.
>
> Setting the watermark interval is done when calling
> `env.setStreamTimeCharacteristic()`. It's the first thing we documented
> for working with event time [1].
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_time.html
>
> To me it was always a usability problem that we didn't have event time
> enabled by default. We didn't have this because of "performance
> considerations". This changed in Flink 1.12 [2].
>
> [2] https://issues.apache.org/jira/browse/FLINK-19317
>
> @Pilgrim: Which version of Flink are you using?
>


Proctime consistency

2021-01-27 Thread Rex Fenley
Hello,

I'm looking at ways to deduplicate data and found [1], but does proctime
get committed with operators? How does this work against clock skew on
different machines?

Thanks

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Timers not firing until stream end

2021-01-27 Thread Aljoscha Krettek

On 2021/01/27 15:09, Chesnay Schepler wrote:
Put another way, if you use any of the built-in WatermarkGenerators and 
use event-time, then it appears that you *must* set this interval.


This behavior is...less than ideal I must admit, and it does not 
appear to be properly documented.


Setting the watermark interval is done when calling 
`env.setStreamTimeCharacteristic()`. It's the first thing we documented

for working with event time [1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_time.html


To me it was always a usability problem that we didn't have event time 
enabled by default. We didn't have this because of "performance 
considerations". This changed in Flink 1.12 [2].


[2] https://issues.apache.org/jira/browse/FLINK-19317

@Pilgrim: Which version of Flink are you using?


Re: Timers not firing until stream end

2021-01-27 Thread Pilgrim Beart
Chesnay,
Thanks for this - I've made the change you suggested
(setAutoWatermarkInterval) but it hasn't changed the behaviour - timers
still get processed only on stream end.
I have pushed a new version, with this change, and also emitting some
information in a .log field.
If you search for "!!!" in Ingest.java and DPTimeoutFunction.java you'll
see the relevant changes.

In DPTimeoutFunction you'll see that if I add code to say "cancel the timer
only if it wouldn't have gone off" then the output is now correct -
individual devices do timeout. However, this output only appears at the end
of the stream (i.e. time jumps backwards as all the timers are processed)
so I still appear not to be seeing timer processing at the correct event
time. If there was no end of stream, I would never get any timeouts.

Below is the output I get when I run. This output is correct but:
a) only because I am manually cancelling timers in DPTimeoutFunction
(search for "!!!")
b) the timer events are timestamped correctly, but are not emitted into the
stream at the right time - and if the stream didn't end then no timeouts
would ever occur (which in particular means that devices that never come
back online will never get marked as offline).

Perhaps I do need to implement an onPeriodicEmit function? Does that
require a customer watermark strategy? I can see how to define a custom
watermark at link below, but unclear how to install that?
https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy

{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 1000 Cancelling previous timer. "}
{"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 1000 Cancelling previous timer. "}
{"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000
msg_in.ts 2000 Cancelling previous timer. "}
{"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 1000
msg_in.ts 2000 Cancelling previous timer. "}
{"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 2000
msg_in.ts 3000 Cancelling previous timer. "}
{"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 2000
msg_in.ts 3000 Cancelling previous timer. "}
{"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 3000
msg_in.ts 4000 Cancelling previous timer. "}
{"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 3000
msg_in.ts 4000 Cancelling previous timer. "}
{"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 4000
msg_in.ts 5000 Cancelling previous timer. "}
{"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 5000 "}
{"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 5000
msg_in.ts 6000 Cancelling previous timer. "}
{"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 5000
msg_in.ts 6000 Cancelling previous timer. "}
{"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 6000
msg_in.ts 7000 Cancelling previous timer. "}
{"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 4000
msg_in.ts 7000 "}
{"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 6000
msg_in.ts 7000 Cancelling previous timer. "}
{"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 7000
msg_in.ts 8000 Cancelling previous timer. "}
{"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 7000
msg_in.ts 8000 Cancelling previous timer. "}
{"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts 7000
msg_in.ts 8000 Cancelling previous timer. "}
{"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts 8000
msg_in.ts 9000 Cancelling previous timer. "}
{"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts 8000
msg_in.ts 9000 Cancelling previous timer. "}
{"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts 8000
msg_in.ts 9000 Cancelling previous timer. "}
{"ts":1,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts 9000
msg_in.ts 1 Cancelling previous timer. "}
{"ts":1,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts 9000
msg_in.ts 1 Cancelling previous timer. "}
{"ts":1,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts 9000
msg_in.ts 1 Cancelling previous timer. "}
{"ts":1001,"id":"2","is_online":false} // These are the "going offline"
events that we want to see. But they are emitted only once the stream has
ended.
{"ts":5001,"id":"1","is_online":false}
{"ts":11001,"id":"1","is_online":false}
{"ts":11001,"id":"0","is_online":false}
{"ts":11001,"id":"2","is_online":false}

Thanks,

-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot

Stateful Functions - accessing the state aside of normal processing

2021-01-27 Thread Stephan Pelikan
Hi,

We are trying to use Statefuns for our tool and it seems to be a good fit. I 
already adopted it and it works quite well. However, we have millions of 
different states (all the same FunctionType but different ids) and each state 
consists of several @Persisted values (values and tables). We want to build an 
administration tool for examining the crowd of states (count, histogram, etc.) 
and each state in detail (the persisted-tables and -values).

Additionally we need some kind of dig-down functionality for finding those 
individual states. For example some of those persisted values can be used to 
categorize the crowd of states.

My question now is how to achieve this. Is there a way to browse and examine 
statefuns in a read-only fashion (their ids, their persisted values)? How can 
one achieve this without duplicating status in e.g. a relational database?

Thanks,
Stephan

PS: I have another questions but I will send them in separate mails to avoid 
mixing up topics.


Re:flink-sql-gateway如何使用flink自定义的udf

2021-01-27 Thread Appleyuchi
https://blog.csdn.net/appleyuchi/article/details/112837327








在 2021-01-27 15:25:41,"阿华田"  写道:
>各位大佬,Flink-sql-gateway 提交flink sql任务 ,如何使用flink自定义的udf
>
>
>| |
>阿华田
>|
>|
>a15733178...@163.com
>|
>签名由网易邮箱大师定制
>


Re: rocksdb block cache usage

2021-01-27 Thread Yun Tang
Hi,

If you have enabled managed memory, and since all rocksDB instances share the 
same block cache within one slot, all 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 in the same slot would report the same value.


Best
Yun Tang

From: Chesnay Schepler 
Sent: Wednesday, January 27, 2021 20:59
To: 曾祥才 ; User-Flink 
Subject: Re: rocksdb block cache usage

I don't quite understand the question; all 3 metrics you listed are the same 
one?

On 1/27/2021 9:23 AM, 曾祥才 wrote:
hi, all
   I've enable state.backend.rocksdb.metrics.block-pinned-usage metric ,
 and the 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 metric exposed.
 I'm confused  that the total memory used for block cache pinned is sum of 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 or just
 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 (for block cache usage the metric seems per slot)?






Re: Flink SQL csv格式分隔符设置失败

2021-01-27 Thread JasonLee
hi

改成下面这样:

\n => U&'\000A'   

\t => U&'\0009'



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Datadog reporter timeout & OOM issue

2021-01-27 Thread Xingcan Cui
Hi Juha and Chesnay,

I do appreciate your prompt responses! I'll also continue to investigate
this issue.

Best,
Xingcan

On Wed, Jan 27, 2021, 04:32 Chesnay Schepler  wrote:

> (setting this field is currently not possible from a Flink user
> perspective; it is something I will investigate)
>
>
> On 1/27/2021 10:30 AM, Chesnay Schepler wrote:
>
> Yes, I could see how the memory issue can occur.
>
> However, it should be limited to buffering 64 requests; this is the
> default limit that okhttp imposes on concurrent calls.
> Maybe lowering this value already does the trick.
>
> On 1/27/2021 5:52 AM, Xingcan Cui wrote:
>
> Hi all,
>
> Recently, I tried to use the Datadog reporter to collect some user-defined
> metrics. Sometimes when reaching traffic peaks (which are also peaks for
> metrics), the HTTP client will throw the following exception:
>
> ```
> [OkHttp https://app.datadoghq.com/...] WARN
>  org.apache.flink.metrics.datadog.DatadogHttpClient  - Failed sending
> request to Datadog
> java.net.SocketTimeoutException: timeout
> at
> okhttp3.internal.http2.Http2Stream$StreamTimeout.newTimeoutException(Http2Stream.java:593)
> at
> okhttp3.internal.http2.Http2Stream$StreamTimeout.exitAndThrowIfTimedOut(Http2Stream.java:601)
> at
> okhttp3.internal.http2.Http2Stream.takeResponseHeaders(Http2Stream.java:146)
> at
> okhttp3.internal.http2.Http2Codec.readResponseHeaders(Http2Codec.java:120)
> at
> okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
> at
> okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
> at
> okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
> at
> okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
> at
> okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
> at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
> at okhttp3.RealCall$AsyncCall.execute(RealCall.java:135)
> at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> ```
>
> I guess this may be caused by the rate limit of the Datadog server since
> too many HTTP requests look like a kind of "attack". The real problem is
> that after throwing the above exceptions, the JVM heap size of the
> taskmanager starts to increase and finally causes OOM. I'm curious if this
> may be caused by metrics accumulation, i.e., for some reason, the client
> can't reconnect to the Datadog server and send the metrics so that the
> metrics data is buffered in memory and causes OOM.
>
> I'm running Flink 1.11.2 on EMR-6.2.0 with
> flink-metrics-datadog-1.11.2.jar.
>
> Thanks,
> Xingcan
>
>
>
>


Re: Timers not firing until stream end

2021-01-27 Thread Chesnay Schepler

||
You were right that it is an issue with the watermarks; outside of the 
when the job was stopped they were never emitted downstream, so no timer 
was ever triggered.


It appears that you need to set the setAutoWatermarkInterval in the 
ExecutionConfig via


env.getConfig().setAutoWatermarkInterval(Duration.ofMillis(500).toMillis());


to have them periodically emitted. Alternatively you could override 
BoundedOutOfOrdernessWatermarks#onEvent to also emit a watermark for 
event (for example, by calling #onPeriodicEmit).


Put another way, if you use any of the built-in WatermarkGenerators and 
use event-time, then it appears that you *must* set this interval.


This behavior is...less than ideal I must admit, and it does not appear 
to be properly documented.


On 1/27/2021 1:56 PM, Chesnay Schepler wrote:


Based on your description you aren't doing anything obviously wrong.

Would it be possible for you to share the code with us?

On 1/27/2021 1:02 PM, Pilgrim Beart wrote:

A newbie question:

I've created a basic Flink DataStream job for an IoT use-case, with 
file source and sink for testing.
I key by device ID, then in a ProcessFunction set an EventTime 
Timer to fire if a device falls silent, i.e. a timeout, which I 
cancel if another message arrives from that device within the timeout.


My test source generates 3 devices, one of which falls silent for 
more than the timeout period during the stream, then resumes again. 
So I expect the Timer to fire for that device during the stream, and 
then for all the Timers to fire after the end of the stream.


The timers do indeed fire at the end of the stream (e.g. with a 
timeout of 1000, the timers all fire 1000 after the end of the 
stream, which is correct). But no timer fires for the device which 
falls silent during the stream (even though other devices are still 
talking, advancing event time). I've verified that I am keying 
correctly by ID.


I suspect this is something to do with Watermarks. I'm using 
forBoundedOutOfOrderness watermarking with a duration of 0.


All suggestions welcome, thanks.

-Pilgrim
--
Learn more at https://devicepilot.com  
@devicepilot 
 
 +44 7961 125282
See our latest features 
 
and book me 
 for 
a video call.








Flink sql problem

2021-01-27 Thread ?g???U?[????
Hi all


After grouping by users, message A 
arrives. If message B also arrives later, and the time of message B is less 
than that of message A within 10 minutes, mark the field in message A with Tag 
= True. How to achieve this?


Thanks
Jiazhi

Re: Flink Job Manager & Task Manager heap size

2021-01-27 Thread Chesnay Schepler

Generally I see 2 options:

a) There's a memory leak somewhere. It would be good to know how the 
baseline heap usage during idleness evolves over time. Are the same 20 
jobs running continuously or are they (or others) periodically re-submitted?


b) The JVM just doesn't feel like running garbage collection. This 
doesn't seem that unreasonable given that there's plenty of memory to go 
around.


Overall, unless you run into OutOfMemoryErrors or the usage during 
idleness keeps steadily rising I wouldn't worry about it too much at 
this time.


On 1/27/2021 8:12 AM, Daniel Peled wrote:

Hi,

We have a flink cluster with 1 JM and 7 TM running about 20 jobs.
We have noticed that both JM & TM are consuming a huge amount of 
memory (several GB) *_although the jobs are doing nothing_* meaning no 
records are passing through the pipeline.
Checkpoints are enabled and the interval between checkpoints is 10 
second (but again no records coming in)


Attached are screenshots of metrics of both JM and one of the TM

Is that normal ?
Any tips for debugging this issue ?

BR,
Danny





Re: pyflink1.11 table.to_pandas() 报错 'Sort on a non-time-attribute field is not supported.'

2021-01-27 Thread Xingbo Huang
看报错你是调用了order_by操作吧。对于unbounded
table来说,order_by只能作用在时间字段上,要不然就得后面接上fetch操作。具体可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#orderby-offset--fetch

Best,
Xingbo

肖越 <18242988...@163.com> 于2021年1月27日周三 下午5:44写道:

> 通过sql_query执行返回table对象,执行table.to_pandas()报错:
> Traceback (most recent call last):
>   File
> "C:/projects/dataService-pyflink_explore/dataService-calculate-code-python/src/test/test_mysql_connector.py",
> line 161, in 
> print(table.to_pandas().head(6))
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py",
> line 723, in to_pandas
> .collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
> line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
> line 154, in deco
> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
> pyflink.util.exceptions.TableException: 'Sort on a non-time-attribute
> field is not supported.'
> 请教各位大佬,为什么会转换失败?但表格本身print_schema是没问题的。


Re: rocksdb block cache usage

2021-01-27 Thread Chesnay Schepler
I don't quite understand the question; all 3 metrics you listed are the 
same one?


On 1/27/2021 9:23 AM, ?? wrote:

hi, all
?0?2 ?0?2I've enable state.backend.rocksdb.metrics.block-pinned-usage metric ,
?0?2and the 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage 
metric exposed.
?0?2I'm confused?0?2 that the total memory used for block cache pinned is 
sum of 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage 
or just
?0?2flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage 
(for block cache usage the metric seems per slot)?







Re: Timers not firing until stream end

2021-01-27 Thread Chesnay Schepler

Based on your description you aren't doing anything obviously wrong.

Would it be possible for you to share the code with us?

On 1/27/2021 1:02 PM, Pilgrim Beart wrote:

A newbie question:

I've created a basic Flink DataStream job for an IoT use-case, with 
file source and sink for testing.
I key by device ID, then in a ProcessFunction set an EventTime 
Timer to fire if a device falls silent, i.e. a timeout, which I cancel 
if another message arrives from that device within the timeout.


My test source generates 3 devices, one of which falls silent for more 
than the timeout period during the stream, then resumes again. So I 
expect the Timer to fire for that device during the stream, and then 
for all the Timers to fire after the end of the stream.


The timers do indeed fire at the end of the stream (e.g. with a 
timeout of 1000, the timers all fire 1000 after the end of the stream, 
which is correct). But no timer fires for the device which falls 
silent during the stream (even though other devices are still talking, 
advancing event time). I've verified that I am keying correctly by ID.


I suspect this is something to do with Watermarks. I'm using 
forBoundedOutOfOrderness watermarking with a duration of 0.


All suggestions welcome, thanks.

-Pilgrim
--
Learn more at https://devicepilot.com  
@devicepilot 
 
 +44 7961 125282
See our latest features 
 
and book me 
 for 
a video call.






Flink-1.11 消费Kafka 写 Hive,消费堆积问题

2021-01-27 Thread nashcen
*flink 消费kafka平均速度 = (8810108 - 8646583)/5  = 32705条/分钟*
每小时大约消费 200w 条数据,速度太慢了,目前已经堆积了800w条数据,这种情况该如何处理?

*21/01/27 20:05:46* INFO clients.Metadata: Cluster ID:
h-faeyjNRhS5xcAUy1JR2Q
Consumer group 'stg_dcpoints_hive' has no active members.

TOPIC PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
CONSUMER-ID HOSTCLIENT-ID
ods_dcpoints_prod 0  6599508940  6608319048  *8810108 *   
-   -   -

*21/01/27 20:10:45* INFO clients.Metadata: Cluster ID:
h-faeyjNRhS5xcAUy1JR2Q
Consumer group 'stg_dcpoints_hive' has no active members.

TOPIC PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
CONSUMER-ID HOSTCLIENT-ID
ods_dcpoints_prod 0  6599672465  6608319048  *8646583 *   
-   -   -



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Timers not firing until stream end

2021-01-27 Thread Pilgrim Beart
A newbie question:

I've created a basic Flink DataStream job for an IoT use-case, with file
source and sink for testing.
I key by device ID, then in a ProcessFunction set an EventTime Timer to
fire if a device falls silent, i.e. a timeout, which I cancel if another
message arrives from that device within the timeout.

My test source generates 3 devices, one of which falls silent for more than
the timeout period during the stream, then resumes again. So I expect the
Timer to fire for that device during the stream, and then for all the
Timers to fire after the end of the stream.

The timers do indeed fire at the end of the stream (e.g. with a timeout of
1000, the timers all fire 1000 after the end of the stream, which is
correct). But no timer fires for the device which falls silent during the
stream (even though other devices are still talking, advancing event time).
I've verified that I am keying correctly by ID.

I suspect this is something to do with Watermarks. I'm using
forBoundedOutOfOrderness watermarking with a duration of 0.

All suggestions welcome, thanks.

-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot

 +44 7961 125282
See our latest features

and book me

for
a video call.


Re: JobManager seems to be leaking temporary jar files

2021-01-27 Thread Chesnay Schepler
The problem of submitted jar files not being closed is a known one: 
https://issues.apache.org/jira/browse/FLINK-9844

IIRC it's not exactly trivial to fix since class-loading is involved.
It's not strictly related to the REST API; it also occurs in the CLI but 
is less noticeable since jars are usually not deleted.


As for the issue with deleteExtractedLibraries, Maciek is generally on a 
good track.
The explicit delete call is indeed missing. The best place to put is 
probably JarRunHandler#handleRequest, within handle after the job was run.

A similar issue also exists in the JarPlanHandler.

I've opened https://issues.apache.org/jira/browse/FLINK-21164 to fix 
this issue.


On 1/26/2021 12:21 PM, Maciek Próchniak wrote:


Hi Matthias,

I think the problem lies somewhere in JarRunHandler, as this is the 
place where the files are created.


I think these are not the files that are managed via BlobService, as 
they are not stored in BlobService folders (I made experiment changing 
default BlobServer folders).


It seems to me that CliFrontend deletes those files explicitly:

https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L250

whereas I couldn't find such invocation in JarRunHandler (not deleting 
those files does not fully explain leak on heap though...)



thanks,

maciek

On 26.01.2021 11:16, Matthias Pohl wrote:

Hi Maciek,
my understanding is that the jars in the JobManager should be cleaned 
up after the job is terminated (I assume that your jobs successfully 
finished). The jars are managed by the BlobService. The dispatcher 
will trigger the jobCleanup in [1] after job termination. Are there 
any suspicious log messages that might indicate an issue?

I'm adding Chesnay to this thread as he might have more insights here.

[1] 
https://github.com/apache/flink/blob/2c4e0ab921ccfaf003073ee50faeae4d4e4f4c93/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L797 



On Mon, Jan 25, 2021 at 8:37 PM Maciek Próchniak > wrote:


Hello,

in our setup we have:

- Flink 1.11.2

- job submission via REST API (first we upload jar, then we submit
multiple jobs with it)

- additional jars embedded in lib directory of main jar (this is
crucial
part)

When we submit jobs this way, Flink creates new temp jar files via
PackagedProgram.extractContainedLibraries method.

We observe that they are not removed after job finishes - it
seems that
PackagedProgram.deleteExtractedLibraries is not invoked when
using REST
API.

What's more, it seems that those jars remain open in JobManager
process.
We observe that when we delete them manually via scripts, the
disk space
is not reclaimed until process is restarted, we also see via heap
dump
inspection that java.util.zip.ZipFile$Source  objects remain,
pointing
to those files. This is quite a problem for us, as we submit
quite a few
jobs, and after a while we ran out of either heap or disk space on
JobManager process/host. Unfortunately, I cannot so far find
where this
leak would happen...

Does anybody have some pointers where we can search? Or how to
fix this
behaviour?


thanks,

maciek





Re: flink-sql-gateway如何使用flink自定义的udf

2021-01-27 Thread Sebastian Liu
1. 确保udf jar 在gateway jvm的classpath中
2. 配置:
https://github.com/ververica/flink-sql-gateway/blob/master/conf/sql-gateway-defaults.yaml#L87

阿华田  于2021年1月27日周三 下午3:26写道:

> 各位大佬,Flink-sql-gateway 提交flink sql任务 ,如何使用flink自定义的udf
>
>
> | |
> 阿华田
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>

-- 

*With kind regards

Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com 
QQ: 3239559*


Re: flink-1.12.1 k8s Session集群模式没有生成Stdout输出文件

2021-01-27 Thread Tianwang Li
在 docker-entrypoint.sh 指定了是 “start-foreground” 模式。
所以没有 .out 文件


Tianwang Li  于2021年1月27日周三 下午5:38写道:

> 我参考:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html
>
> 部署了一个session集群,运行任务。
> 在UI查看Stdout是空的,没有生成 .out 文件
>
> 请问是哪里配置的问题吗?
>
> --
> **
>  tivanli
> **
>


-- 
**
 tivanli
**


Re: sql编译代码时超过64kb导致编译失败

2021-01-27 Thread Sebastian Liu
这个问题需要更改CodeGen部分的代码,你可以把上述ExpressionReducer的问题补充到如下issue中,
我可以帮助一起fix.  https://issues.apache.org/jira/browse/FLINK-20898
另外,辛苦也补充一个测试sql示例。

stgztsw  于2021年1月27日周三 下午6:10写道:

> sql编译代码时超过64kb导致编译失败,请问有啥办法绕过这个问题吗?尝试过把sql逻辑拆开成多个view,
> 但是sql逻辑优化的时候还是会合并到一起,无法绕过
>
> Caused by: org.codehaus.janino.InternalCompilerException: Code of method
> "map(Ljava/lang/Object;)Ljava/lang/Object;" of class
> "ExpressionReducer$3674" grows beyond 64 KB
> at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:1009)
> at org.codehaus.janino.CodeContext.write(CodeContext.java:901)
> at
> org.codehaus.janino.UnitCompiler.writeShort(UnitCompiler.java:12195)
> at
> org.codehaus.janino.UnitCompiler.pushConstant(UnitCompiler.java:10660)
> at
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5553)
> at
> org.codehaus.janino.UnitCompiler.access$9300(UnitCompiler.java:215)
> at
>
> org.codehaus.janino.UnitCompiler$16.visitIntegerLiteral(UnitCompiler.java:4423)
> at
>
> org.codehaus.janino.UnitCompiler$16.visitIntegerLiteral(UnitCompiler.java:4394)
> at org.codehaus.janino.Java$IntegerLiteral.accept(Java.java:5442)
> at
> org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> at
> org.codehaus.janino.UnitCompiler.fakeCompile(UnitCompiler.java:3719)
> at
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5569)
> at
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5165)
> at
> org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> at
>
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
> at
>
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> at
> org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> at
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
> at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
> at
> org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
> at
>
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
> at
>
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
> at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
> at
> org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
> at
>
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
> at
>
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
> at
> org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
> at
> org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
> at
> org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$Block.accept(Java.java:2776)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2465)
> at
> org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495)
> at
> org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$IfStatement.accept(Java.java:2947)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
> at
>
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
> at
>
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> at
> org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> at
>
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> at
>
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)

flink sql 处理自定义watermark

2021-01-27 Thread 花乞丐
我在使用Flink消费kafka中的消息,并对kafka中的每条消息增加水印,然后将kafka转换成Row,写入Hive,但是在提交PartitionTimeCommit的时候水印一直是长整形的最大负数,导致一直没办法提交分区,在hive中无法查询到数据。但是在hdfs上是有文件的,目前不太清楚是什么情况导致!

FlinkKafkaConsumerBase waterMessages =
messages.assignTimestampsAndWatermarks(
   
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new
SerializableTimestampAssigner() {
@Override
public long extractTimestamp(FlatMessage
element, long recordTimestamp) {
Long es = element.getEs();
return es;
}
})
);

INSERT INTO ods.trade_real_delivery_incr\n" +
"SELECT\n" +
"id,\n" +
"original_id,\n" +
"order_id,\n" +
"order_code,\n" +
"business_type,\n" +
"delivery_id,\n" +
"num,\n" +
"weight,\n" +
"creator_id,\n" +
"creator,\n" +
"admin_id,\n" +
"admin_name,\n" +
"admin_depart_id,\n" +
"admin_depart_code,\n" +
"admin_depart_name,\n" +
"create_time,\n" +
"update_time,\n" +
"es,\n" +
"ts,\n" +
"op,\n" +
"dt\n" +
"FROM trade_real_delivery_tmp




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: flink-sql-gateway稳定性如何,可以在生产环境使用吗?

2021-01-27 Thread zilong xiao
session-client 作用是什么呢? session的维护和管理吗?

felixzh  于2021年1月27日周三 下午5:49写道:

> 如果使用flink-sql-gateway,建议自己参照jdbc封装一个session-client
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-27 14:41:28,"Jeff Zhang"  写道:
> >zeppelin 有 rest api 接口,
> https://www.yuque.com/jeffzhangjianfeng/gldg8w/pz2xoh
> >
> >jinsx  于2021年1月27日周三 下午2:30写道:
> >
> >> 如果使用zeppelin,zeppelin可以提供rpc接口吗
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
> >
> >
> >--
> >Best Regards
> >
> >Jeff Zhang
>


sql编译代码时超过64kb导致编译失败

2021-01-27 Thread stgztsw
sql编译代码时超过64kb导致编译失败,请问有啥办法绕过这个问题吗?尝试过把sql逻辑拆开成多个view,
但是sql逻辑优化的时候还是会合并到一起,无法绕过

Caused by: org.codehaus.janino.InternalCompilerException: Code of method
"map(Ljava/lang/Object;)Ljava/lang/Object;" of class
"ExpressionReducer$3674" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:1009)
at org.codehaus.janino.CodeContext.write(CodeContext.java:901)
at org.codehaus.janino.UnitCompiler.writeShort(UnitCompiler.java:12195)
at 
org.codehaus.janino.UnitCompiler.pushConstant(UnitCompiler.java:10660)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5553)
at org.codehaus.janino.UnitCompiler.access$9300(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$16.visitIntegerLiteral(UnitCompiler.java:4423)
at
org.codehaus.janino.UnitCompiler$16.visitIntegerLiteral(UnitCompiler.java:4394)
at org.codehaus.janino.Java$IntegerLiteral.accept(Java.java:5442)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at org.codehaus.janino.UnitCompiler.fakeCompile(UnitCompiler.java:3719)
at 
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5569)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5165)
at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
at
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at 
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
at
org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
at
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
at org.codehaus.janino.Java$Block.accept(Java.java:2776)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2465)
at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495)
at
org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$IfStatement.accept(Java.java:2947)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
at
org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
... 51 more



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink upgrade to Flink-1.12

2021-01-27 Thread Aljoscha Krettek
I'm afraid I also don't know more than that. But I agree with Ufuk that 
it should just work.


I think the best way would be to try it in a test environment and then 
go forward with upgrading the production jobs/cluster.


Best,
Aljoscha

On 2021/01/25 18:59, Ufuk Celebi wrote:

Thanks for reaching out. Semi-asynchronous does *not* refer to incremental 
checkpoints and Savepoints are always triggered as full snapshots (not 
incremental).

Earlier versions of the RocksDb state backend supported two snapshotting modes, 
fully and semi-asynchronous snapshots. Semi-asynchronous state snapshots for 
RocksDb have been removed a long time ago by Aljoscha in 
https://github.com/apache/flink/pull/2345 (FLINK-4340). The notes you are 
referencing were added around that time and I'm afraid they might have become 
mostly obsolete.

I'm pulling in Aljoscha who should be able to give a definitive answer here.

To make a long story short, it should simply work for you to upgrade from 1.11 
to 1.12 via a Savepoint.

Cheers,

Ufuk

On Wed, Jan 20, 2021, at 3:58 AM, 耿延杰 wrote:

Hi all,

As flink doc says:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/upgrading.html#preconditions


We do not support migration for state in RocksDB that was checkpointed using 
`semi-asynchronous` mode. In case your old job was using this mode, you can 
still change your job to use `fully-asynchronous` mode before taking the 
savepoint that is used as the basis for the migration.


So, my first question:
Is "semi-asynchronous" means "incremental checkpoint"?

And second question:
If so, assume I'm using flink-1.11 and RocksDB with incremental asynchronous 
checkpoint as state backend.
I should:
1. take a savepoint for old version(flink-1.11),
2. and change job to use "full asynchronous checkpoint" ,
3. restart old version(flink-1.11) job with new config (full asynchronous 
checkpoint),
4. then, take a savepoint
5. and finally, stop old version(flink-1.11) and upgrade to flink-1.12

Whether I understand correctly?

Best regards


Re:Re: flink-sql-gateway稳定性如何,可以在生产环境使用吗?

2021-01-27 Thread felixzh
如果使用flink-sql-gateway,建议自己参照jdbc封装一个session-client

















在 2021-01-27 14:41:28,"Jeff Zhang"  写道:
>zeppelin 有 rest api 接口,https://www.yuque.com/jeffzhangjianfeng/gldg8w/pz2xoh
>
>jinsx  于2021年1月27日周三 下午2:30写道:
>
>> 如果使用zeppelin,zeppelin可以提供rpc接口吗
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>
>
>-- 
>Best Regards
>
>Jeff Zhang


Overhead when using map state

2021-01-27 Thread Lasse Nedergaard
Hi

We use Rocksdb for storing state and run on Flink 1.10.  
We have followed best practices and used map state instead of a map in value 
state. We have seen problems with OOM exceptions and investigated it be 
creating a job with n numbers of key by where each key had a map either stored 
in map state or value state. The job reads and updates random values in the 
maps. 
It turn out that the same map stored in map state consumes 3-4 times the memory 
compared with storing it in value state. 
1. Can anyone explain why the overhead is so big?

At the same time we also see the throughput drops compared with value state. If 
iterate over all key in the state it would make sense but in our test we access 
random individual keys. We had a huge pressure on rocksdb and that could be the 
case. 
2. Can anyone explain why the pressure on rocksdb are higher using map state 
compared to value state with a map?



Med venlig hilsen / Best regards
Lasse Nedergaard



pyflink1.11 table.to_pandas() 报错 'Sort on a non-time-attribute field is not supported.'

2021-01-27 Thread 肖越
通过sql_query执行返回table对象,执行table.to_pandas()报错:
Traceback (most recent call last):
  File 
"C:/projects/dataService-pyflink_explore/dataService-calculate-code-python/src/test/test_mysql_connector.py",
 line 161, in 
print(table.to_pandas().head(6))
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py",
 line 723, in to_pandas
.collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'Sort on a non-time-attribute field is 
not supported.'
请教各位大佬,为什么会转换失败?但表格本身print_schema是没问题的。

Re: flink-sql-gateway相关问题

2021-01-27 Thread zilong xiao
好的

Lin Li  于2021年1月27日周三 下午5:20写道:

> try:  "execution.target: yarn-pre-job"  ->  "execution.target:
> yarn-per-job"
>
> zilong xiao  于2021年1月27日周三 上午10:17写道:
>
> > 感谢您的回答,我在flink-conf.yaml里指定"execution.target: yarn-pre-job"后,尝试用rest
> > api生成session id时会遇到异常,不清楚是为何,可否帮忙看下
> >
> > flink version: 1.11.3
> > execution.target: yarn-pre-job
> > rest api请求路径和参数:
> > http://localhost:8083/v1/sessions
> > {
> > "planner": "blink",
> > "execution_type": "streaming"
> >}
> >
> > 异常信息:Caused by: java.lang.IllegalStateException: No ClusterClientFactory
> > found. If you were targeting a Yarn cluster, please make sure to export
> the
> > HADOOP_CLASSPATH environment variable or have hadoop in your classpath.
> For
> > more information refer to the "Deployment & Operations" section of the
> > official Apache Flink documentation.
> >
> > Sebastian Liu  于2021年1月27日周三 上午1:01写道:
> >
> > > sql gateway 提交job方式和flink client类似主要取决于flink-conf中的execution.config 配置,
> > > 对per job模式on yarn, 对应的配置是“yarn-per-job”,
> > >
> > >
> >
> 这样加载的PipelineExecutor会控制为:YarnJobClusterExecutorFactory,具备向Yarn提交job的能力,这和使用flink
> > > client
> > > 提交job最终是一个调用路径,余下在flink-conf中添加上yarn相关配置我理解就可以了。
> > > org.apache.flink.yarn.configuration.YarnConfigOptions
> > >
> > > zilong xiao  于2021年1月26日周二 下午4:00写道:
> > >
> > > > 请问有关于flink-sql-gateway rest api以pre job模式提交作业到yarn集群的文档吗?
> > > >
> > >
> > >
> > > --
> > >
> > > *With kind regards
> > > 
> > > Sebastian Liu 刘洋
> > > Institute of Computing Technology, Chinese Academy of Science
> > > Mobile\WeChat: +86—15201613655
> > > E-mail: liuyang0...@gmail.com 
> > > QQ: 3239559*
> > >
> >
>


flink-1.12.1 k8s Session集群模式没有生成Stdout输出文件

2021-01-27 Thread Tianwang Li
我参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html

部署了一个session集群,运行任务。
在UI查看Stdout是空的,没有生成 .out 文件

请问是哪里配置的问题吗?

-- 
**
 tivanli
**


Re: Datadog reporter timeout & OOM issue

2021-01-27 Thread Chesnay Schepler
(setting this field is currently not possible from a Flink user 
perspective; it is something I will investigate)



On 1/27/2021 10:30 AM, Chesnay Schepler wrote:

Yes, I could see how the memory issue can occur.

However, it should be limited to buffering 64 requests; this is the 
default limit that okhttp imposes on concurrent calls.

Maybe lowering this value already does the trick.

On 1/27/2021 5:52 AM, Xingcan Cui wrote:

Hi all,

Recently, I tried to use the Datadog reporter to collect some 
user-defined metrics. Sometimes when reaching traffic peaks (which 
are also peaks for metrics), the HTTP client will throw the following 
exception:


```
[OkHttp https://app.datadoghq.com/.. .] 
WARN  org.apache.flink.metrics.datadog.DatadogHttpClient  - Failed 
sending request to Datadog

java.net.SocketTimeoutException: timeout
at 
okhttp3.internal.http2.Http2Stream$StreamTimeout.newTimeoutException(Http2Stream.java:593)
at 
okhttp3.internal.http2.Http2Stream$StreamTimeout.exitAndThrowIfTimedOut(Http2Stream.java:601)
at 
okhttp3.internal.http2.Http2Stream.takeResponseHeaders(Http2Stream.java:146)
at 
okhttp3.internal.http2.Http2Codec.readResponseHeaders(Http2Codec.java:120)
at 
okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at 
okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at 
okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:135)
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)
```

I guess this may be caused by the rate limit of the Datadog server 
since too many HTTP requests look like a kind of "attack". The real 
problem is that after throwing the above exceptions, the JVM heap 
size of the taskmanager starts to increase and finally causes OOM. 
I'm curious if this may be caused by metrics accumulation, i.e., for 
some reason, the client can't reconnect to the Datadog server and 
send the metrics so that the metrics data is buffered in memory and 
causes OOM.


I'm running Flink 1.11.2 on EMR-6.2.0 with 
flink-metrics-datadog-1.11.2.jar.


Thanks,
Xingcan







Re: Datadog reporter timeout & OOM issue

2021-01-27 Thread Chesnay Schepler

Yes, I could see how the memory issue can occur.

However, it should be limited to buffering 64 requests; this is the 
default limit that okhttp imposes on concurrent calls.

Maybe lowering this value already does the trick.

On 1/27/2021 5:52 AM, Xingcan Cui wrote:

Hi all,

Recently, I tried to use the Datadog reporter to collect some 
user-defined metrics. Sometimes when reaching traffic peaks (which are 
also peaks for metrics), the HTTP client will throw the following 
exception:


```
[OkHttp https://app.datadoghq.com/.. .] 
WARN  org.apache.flink.metrics.datadog.DatadogHttpClient  - Failed 
sending request to Datadog

java.net.SocketTimeoutException: timeout
at 
okhttp3.internal.http2.Http2Stream$StreamTimeout.newTimeoutException(Http2Stream.java:593)
at 
okhttp3.internal.http2.Http2Stream$StreamTimeout.exitAndThrowIfTimedOut(Http2Stream.java:601)
at 
okhttp3.internal.http2.Http2Stream.takeResponseHeaders(Http2Stream.java:146)
at 
okhttp3.internal.http2.Http2Codec.readResponseHeaders(Http2Codec.java:120)
at 
okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at 
okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at 
okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:135)
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)
```

I guess this may be caused by the rate limit of the Datadog server 
since too many HTTP requests look like a kind of "attack". The real 
problem is that after throwing the above exceptions, the JVM heap size 
of the taskmanager starts to increase and finally causes OOM. I'm 
curious if this may be caused by metrics accumulation, i.e., for some 
reason, the client can't reconnect to the Datadog server and send the 
metrics so that the metrics data is buffered in memory and causes OOM.


I'm running Flink 1.11.2 on EMR-6.2.0 with 
flink-metrics-datadog-1.11.2.jar.


Thanks,
Xingcan





Re: flink-sql-gateway相关问题

2021-01-27 Thread Lin Li
try:  "execution.target: yarn-pre-job"  ->  "execution.target: yarn-per-job"

zilong xiao  于2021年1月27日周三 上午10:17写道:

> 感谢您的回答,我在flink-conf.yaml里指定"execution.target: yarn-pre-job"后,尝试用rest
> api生成session id时会遇到异常,不清楚是为何,可否帮忙看下
>
> flink version: 1.11.3
> execution.target: yarn-pre-job
> rest api请求路径和参数:
> http://localhost:8083/v1/sessions
> {
> "planner": "blink",
> "execution_type": "streaming"
>}
>
> 异常信息:Caused by: java.lang.IllegalStateException: No ClusterClientFactory
> found. If you were targeting a Yarn cluster, please make sure to export the
> HADOOP_CLASSPATH environment variable or have hadoop in your classpath. For
> more information refer to the "Deployment & Operations" section of the
> official Apache Flink documentation.
>
> Sebastian Liu  于2021年1月27日周三 上午1:01写道:
>
> > sql gateway 提交job方式和flink client类似主要取决于flink-conf中的execution.config 配置,
> > 对per job模式on yarn, 对应的配置是“yarn-per-job”,
> >
> >
> 这样加载的PipelineExecutor会控制为:YarnJobClusterExecutorFactory,具备向Yarn提交job的能力,这和使用flink
> > client
> > 提交job最终是一个调用路径,余下在flink-conf中添加上yarn相关配置我理解就可以了。
> > org.apache.flink.yarn.configuration.YarnConfigOptions
> >
> > zilong xiao  于2021年1月26日周二 下午4:00写道:
> >
> > > 请问有关于flink-sql-gateway rest api以pre job模式提交作业到yarn集群的文档吗?
> > >
> >
> >
> > --
> >
> > *With kind regards
> > 
> > Sebastian Liu 刘洋
> > Institute of Computing Technology, Chinese Academy of Science
> > Mobile\WeChat: +86—15201613655
> > E-mail: liuyang0...@gmail.com 
> > QQ: 3239559*
> >
>


Re: Seeing Rocks Native Metrics in Data Dog

2021-01-27 Thread Chesnay Schepler
AFAIK all IDs (and in fact all variables except ) are exposed as 
tags. (the  is transmitted separately and I would've though 
Datadog automatically provides similar functionality for it).


On 1/27/2021 2:11 AM, Rex Fenley wrote:
Oddly, I'm seeing them now. I'm not sure what has changed. Fwiw, we 
have modified the scopes per 
https://docs.datadoghq.com/integrations/flink/#metric-collection 
 but 
their modifications ids as tags. We do need to modify them according 
to that documentation - "*Note*: The system scopes must be remapped 
for your Flink metrics to be supported, otherwise they are submitted 
as custom metrics." Could we instead add host and ids as tags to our 
metrics?


Thanks for your help!

On Tue, Jan 26, 2021 at 2:49 PM Chesnay Schepler > wrote:


It is good to know that something from the task executors arrives
at datadog.

Do you see any metrics for a specific job, like the numRestarts
metric of the JobManager?

Are you using the default scope formats

,
or have you modified them?
Could you try these instead and report back? (I replaced all
job/task/operator names with their IDs, in case some special
character is messing with datadog)

metrics.scope.jm : .jobmanager
metrics.scope.jm.job: .jobmanager.
metrics.scope.tm : .taskmanager.
metrics.scope.tm.job: .taskmanager..
metrics.scope.task:
.taskmanager
metrics.scope.operator:
.taskmanager


On 1/26/2021 9:28 PM, Rex Fenley wrote:

All taskmanager and jobmanager logs show up. Anything specific to
an operator does not.
For example, flink.taskmanager.Status.JVM.Memory.Heap.Used shows
up, but I can't see stats on an individual operator.

I mostly followed a combination of
https://docs.datadoghq.com/integrations/flink/#metric-collection

and

https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html#datadog-orgapacheflinkmetricsdatadogdatadoghttpreporter


since datadog's documentation was slightly out of date.

Thanks

On Tue, Jan 26, 2021 at 10:28 AM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Anything metric that is shown in the Flink UI should also
appear in DataDog.
If this is not the case then something goes wrong within the
reporter.

Is there anything suspicious in the Flink logs?

Can you give some example of metrics that /do/ show up in
DataDog?

On 1/26/2021 6:32 PM, Rex Fenley wrote:

Hi,

I need to get a deeper dive into how rocks is performing so
I turned on Rocks Native Metrics. However, I don't see any
of the metrics in DataDog (though I have other Flink metrics
in DataDog). I only see rocks metrics in the operator
metrics in Flink UI, and unfortunately I can't really zoom
in or out of those metrics or compare against multiple
operators at a time which makes it really difficult to get
an overview of how rocks is doing.

Is this there any way to get the Rocks Native Metrics
forwarded over to DataDog?

Thanks!

-- 


Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG
 | FOLLOW US
 | LIKE US






-- 


Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG
 | FOLLOW US
 | LIKE US






--

Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG  | 
FOLLOW US  | LIKE US 







Re: Flink sql 1.12写入hive报metastore失败

2021-01-27 Thread Rui Li
你好,

你发的这个stacktrace只是一个warning,是否还有其他的异常呢?

On Wed, Jan 27, 2021 at 10:47 AM gimlee  wrote:

> 使用flink sql 1.12写入hive,未提交到yarn上成功,错误信息如下:
> 2021-01-26 20:44:23.133 [main] INFO
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient  - Trying to connect
> to
> metastore with URI thrift://hdcom02.prd.com:9083
> 2021-01-26 20:44:23.133 [main] INFO
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient  - Trying to connect
> to
> metastore with URI thrift://hdcom02.prd.com:9083
> 2021-01-26 20:44:23.134 [main] INFO
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient  - Opened a connection
> to metastore, current connections: 2
> 2021-01-26 20:44:23.134 [main] INFO
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient  - Opened a connection
> to metastore, current connections: 2
> 2021-01-26 20:44:23.181 [main] WARN
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient  - set_ugi() not
> successful, Likely cause: new client talking to old server. Continuing
> without it.
> org.apache.thrift.transport.TTransportException: null
> at
>
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
> at
> org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
> at
>
> org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:380)
> at
>
> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:230)
> at
> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
> at
>
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_set_ugi(ThriftHiveMetastore.java:4787)
> at
>
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.set_ugi(ThriftHiveMetastore.java:4773)
> at
>
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:534)
> at
>
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:224)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
>
> org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:84)
> at
>
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:95)
> at
>
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:148)
> at
>
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.table.catalog.hive.client.HiveShimV310.getHiveMetastoreClient(HiveShimV310.java:112)
> at
>
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:274)
> at
>
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:80)
> at
>
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:32)
> at
>
> org.apache.flink.connectors.hive.HiveTableSink.consume(HiveTableSink.java:145)
> at
>
> org.apache.flink.connectors.hive.HiveTableSink.lambda$getSinkRuntimeProvider$0(HiveTableSink.java:137)
> at
>
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:109)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at 

文档问题

2021-01-27 Thread xiaolailong


Re: Initializing broadcast state

2021-01-27 Thread Wei Jiang
Hi guys,
i meet the same question, but i use a different way to init: 
```
val list = ...   //i use jdbc to get the init data
val dimensionInitStream = env.fromCollection(list)
//the main stream and the `dimensionStream` is a stream from flink cdc
val dimension =
dimensionStream.union(dimensionInitStream).broadcast(descriptor)
...
```
then the main stream can connect the broadcast state...
e... i dont know why it works, how do you think about that?


Guowei Ma wrote
> Hi, Nick
>   You might need to handle it yourself If you have to process an element
> only after you get the broadcast state.
>   For example, you could “cache” the element to the state and handle it
> when the element from the broadcast side elements are arrived. Specially
> if
> you are using the `KeyedBroadcastProcessFunction` you could use the
> `applyToKeyedState` to access the element you cache before.
> 
> Best,
> Guowei
> 
> 
> On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner 

> buggie89@

>  wrote:
> 
>> Hi guys,
>> What is the way to initialize broadcast state(say with default values)
>> before the first element shows up in the broadcasting stream? I do a
>> lookup
>> on the broadcast state to process transactions which come from another
>> stream. The problem is the broadcast state is empty until the first
>> element
>> shows up.
>>
>>
>> Best,
>> Nick.
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


文档问题

2021-01-27 Thread xiaolailong


回复:退订

2021-01-27 Thread Roc Marshal
Hi, Tang.
Please send short message to user-zh-unsubscr...@flink.apache.org if you want 
to unsubscribe the mail.


Best, Roc.


| |
Roc Marshal
|
|
flin...@126.com
|
签名由网易邮箱大师定制


在2021年01月27日 16:41,唐军亮 写道:
退订

退订

2021-01-27 Thread 唐军亮
退订

Re: Difference between table.exec.source.idle-timeout and setIdleStateRetentionTime ?

2021-01-27 Thread Dawid Wysakowicz
Hey,

As for the MATCH_RECOGNIZE clause, I highly recommend applying a time
constraint[1]. The idle state retention time does not apply to the
MATCH_RECOGNIZE, but you can think of the time constraint as something
similar, but it is closer to the actual query logic.

If you are hitting FLINK-15160 unfortunately I don't have a good
solution for it. The only thing that comes to my mind is adding a
heartbeat event to the event stream to prune the partial matches, but I
understand it is quite invasive.

If you would be willing to help fixing the problem in FLINK, I could
also help review it and give pointers how it could be done.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/match_recognize.html#time-constraint

On 26/01/2021 17:39, Dcosta, Agnelo (HBO) wrote:
>
> Hi Dawid, thanks for the clarification and it helps a lot.
> Reply to couple of points :
>
> what is causing the state to grow?
> We are using flink SQL and have 5 pattern match queries , 3 group by
> tumble windows. State growth over time is primarily coming from
> pattern match queries.
>
> Is it ever growing keyspace?
> Yes. By design our keyspace is ever growing. The expectation is that
> messages for one key will come in for couple of hours, then stop
> coming in. We would never see messages from that key again. New keys
> are constantly coming in.
>
> Is it that a watermark does not progress?
> Watermark on the subtask level is constantly updating and is in sync
> with other subtasks. We have not seen any issues with watermark
> updating as such.
>
> Looking through mailing list archive, our problem seems similar to
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-in-CEP-queries-keep-increasing-td31045.html
> https://issues.apache.org/jira/browse/FLINK-15160 : Clean up is not
> applied if there are no incoming events for a key.
>
> By design we can have partial matched states/matches in pattern match
> queries. And key space is such that no new event comes in for those
> partial matches.
>
> thanks.
>
>  
>
> *From: *Dawid Wysakowicz 
> *Date: *Tuesday, January 26, 2021 at 3:14 AM
> *To: *Dcosta, Agnelo (HBO) ,
> user@flink.apache.org 
> *Subject: *Re: Difference between table.exec.source.idle-timeout and
> setIdleStateRetentionTime ?
>
> **External Email received from: dwysakow...@apache.org **
>
>  
>
> Hi,
>
> The difference is that the *table.exec.source.idle-timeout *is used
> for dealing with source idleness[1]. It is a problem that a watermark
> cannot advance if some of the partition become idle (do not produce
> any records). Watermark is always the minimum of watermarks of all
> input partitions. The setting makes flink ignore certain partitions in
> the calculation after the time threshold is reached.
>
> The IdleStateRetention is Table API specific. As described in the link
> you provided it removes entries from a state for keys that were not
> seen for a given time threshold.
>
> As for your issue, I'd recommend first investigating what is causing
> the state to grow. Is it ever growing keyspace? Is it that a watermark
> does not progress (this should manifest in results as well). Or is it
> something else.
>
> Best,
>
> Dawid
>
>  
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
>
> On 25/01/2021 20:12, Dcosta, Agnelo (HBO) wrote:
>
> Hi,
>
> What is the difference between *table.exec.source.idle-timeout* and
> *setIdleStateRetentionTime* ?
>
> table.exec.source.idle-timeout:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/config.html#table-exec-source-idle-timeout
>
>  
>
> setIdleStateRetentionTime:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
>  
>
> Some context:
> Hi we are using flink 1.12.
> Our checkpoint size is constantly increasing once app is deployed.
> After performing a restart, checkpoint size goes back to expected size.
> Looking at actual checkpoint files generated, it seems our app is
> holding on to state/events since the time app started up.
> Based on our sql, the maximum time we would need to hold state is 10
> minutes.
>
>  
>
> This e-mail is intended only for the use of the addressees. Any
> copying, forwarding, printing or other use of this e-mail by persons
> other than the addressees is not authorized. This e-mail may contain
> information that is privileged, confidential and exempt from
> disclosure. If you are not the intended recipient, please notify us
> immediately by return e-mail (including the original message in your
> reply) and then delete and discard all copies of the e-mail. Thank you.
> HB75
>


signature.asc
Description: OpenPGP digital signature


rocksdb block cache usage

2021-01-27 Thread ??????
hi, all
 I've enable state.backend.rocksdb.metrics.block-pinned-usage 
metric ,
and the 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 metric exposed.
I'm confused that the total memory used for block cache pinned is 
sum of 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 or just
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 (for block cache usage the metric seems per slot)?