Flink Kafka offsets

2020-10-12 Thread Rex Fenley
Hello,

I've been trying to configure the offset start position for a flink kafka
consumer. when there is no committed offset, to always start at the
beginning. It seems like the typical way to do this would be setting
auto.offset.reset=earliest however, I don't see that configuration property
in the documentation.
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

However, I do see scan.startup.mode = earliest-offset, but from the docs it
sounds like this would mean it would never commit an offset and flink would
always start consuming from the beginning of the kafka stream, which is not
what I want.

Is this the case or am I misunderstanding? How can I get the behavior that
I wish to see, where committed offsets are respected, but no offset means
start at the beginning of the kafka log stream?

Thanks!
-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: why we need keyed state and operate state when we already have checkpoint?

2020-10-12 Thread Congxian Qiu
Hi
As others said, state is different as checkpoint.  a checkpoint is just
a **snapshot** of the state, and you can restore from the previous
checkpoint if the job crashed.

state is for stateful computation, and checkpoint is for
fault-tolerant[1]

The state keeps the information you'll need in the future. Take
wordcount as an example, the count of the word depends on the total count
of the word we have seen, we need to keep the "total count of the word have
seen before" somewhere, in Flink you can keep it in the state.
checkpoint/savepoint contains the **snapshot** of all the state, if
there is not state, then the checkpoint will be *empty*, you can restore
from it, but the content is empty.

PS: maybe you don't create state explicit, but there contain some
states in Flink(such as WindowOperator)

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/stateful-stream-processing.html
Best,
Congxian


大森林  于2020年10月12日周一 下午9:26写道:

> Thanks for your replies.
> When I use no state-relevant code in my program,the checkingpoint can be
> saved and resumed.❶
>
> So then why we need *Keyed State/Operator State/Stateful Function*?❷
> *"the operators are reset to the time of the respective checkpoint."*
> We already have met the requirement:*"resume from checkpoint(last state
> of each operator which store the result)"*❶,
> why we still need ❷?
> Thanks for your help~!
>
>
>
> -- 原始邮件 --
> *发件人:* "Arvid Heise" ;
> *发送时间:* 2020年10月12日(星期一) 下午2:53
> *收件人:* "大森林";
> *抄送:* "Shengkai Fang";"user";
> *主题:* Re: why we need keyed state and operate state when we already have
> checkpoint?
>
> Hi 大森林,
>
> You can always resume from checkpoints independent of the usage of keyed
> or non-keyed state of operators.
> 1 checkpoint contains the state of all operators at a given point in time.
> Each operator may have keyed state, raw state, or non-keyed state.
> As long as you are not changing the operators (too much) before
> restarting, you can always restart.
>
> During (automatic) restart of a Flink application, the state of a given
> checkpoint is restored to the operators, such that it looks like the
> operator never failed. However, the operators are reset to the time of the
> respective checkpoint.
>
> I have no clue what you mean with "previous variable temporary result".
>
> On Wed, Oct 7, 2020 at 9:13 AM 大森林  wrote:
>
>> Thanks for your replies,I have some understandings.
>>
>> There are two cases.
>> 1. if I use no keyed state in program,when it's killed,I can only resume
>> from previous result
>> 1. if I use  keyed state in program,when it's killed,I can
>>  resume from previous result and previous variable temporary result.
>>
>> Am I right?
>> Thanks for your guide.
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Arvid Heise" ;
>> *发送时间:* 2020年10月7日(星期三) 下午2:25
>> *收件人:* "大森林";
>> *抄送:* "Shengkai Fang";"user";
>> *主题:* Re: why we need keyed state and operate state when we already have
>> checkpoint?
>>
>> I think there is some misunderstanding here: a checkpoint IS (a snapshot
>> of) the keyed state and operator state (among a few more things). [1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#definitions
>>
>> On Wed, Oct 7, 2020 at 6:51 AM 大森林  wrote:
>>
>>> when the job is killed,state is also misssing.
>>> so why we need keyed state?Is keyed state useful when we try to resuming
>>> the killed job?
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* "Shengkai Fang" ;
>>> *发送时间:* 2020年10月7日(星期三) 中午12:43
>>> *收件人:* "大森林";
>>> *抄送:* "user";
>>> *主题:* Re: why we need keyed state and operate state when we already
>>> have checkpoint?
>>>
>>> The checkpoint is a snapshot for the job and we can resume the job if
>>> the job is killed unexpectedly. The state is another thing to memorize the
>>> intermediate result of calculation. I don't think the checkpoint can
>>> replace state.
>>>
>>> 大森林  于2020年10月7日周三 下午12:26写道:
>>>
 Could you tell me:

 why we need keyed state and operator state when we already have
 checkpoint?

 when a running jar crash,we can resume from the checkpoint
 automatically/manually.
 So why did we still need keyed state and operator state.

 Thanks

>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink 

RE: state access causing segmentation fault

2020-10-12 Thread Colletta, Edward
Thanks Arvid,

I added static to ExecQueue and this did fix the problem.  I tested without 
static on RingBufferExec because it seems that if ExecQueue is static nested, 
there should be no reference to the MyKeyedProcessFunction object as 
RingBufferExec is an inner class of ExecQueue.

However, I did that just for the test.  For my prod code, going forward,  I am 
following flink’s rules for POJO types, adding static to any inner class,  and 
checking for any POJO warnings in the logs.


From: Arvid Heise 
Sent: Sunday, October 11, 2020 3:46 PM
To: Colletta, Edward 
Cc: Dawid Wysakowicz ; user@flink.apache.org
Subject: Re: state access causing segmentation fault

This email is from an external source - exercise caution regarding links and 
attachments.

Hi Edward,

could you try adding the static keyword to ExecQueue and RingBufferExec? As is 
they hold a reference to the MyKeyedProcessFunction, which has unforeseen 
consequences.

On Sun, Oct 11, 2020 at 5:38 AM Colletta, Edward 
mailto:edward.colle...@fmr.com>> wrote:
Tried to attach tar file but it got blocked.   Resending with files attached 
individually.


Ok, have minimal reproducible example.   Attaching a tar file of the job that 
crashed.

The crash has nothing to do with the number of state variables.  But it does 
seem to be caused by using a type for the state variable that is a class nested 
in the KeyedProcessFunction.

Reduced to a single state variable.  The type of the state variable was a class 
(ExecQueue) defined in class implementing KeyedProcessFunction.  Moving the 
ExecQueue definition to its own file fixed the problem.



The attached example always crashes  the taskManager in 30 seconds to 5 minutes.



MyKeyedProcessFunction.java  and also cut and pasted here:



package crash;



import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.api.common.typeinfo.TypeHint;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;

import 
org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;

import org.apache.flink.util.Collector;



public class MyKeyedProcessFunction extends KeyedProcessFunction {

private static final Logger LOG = 
LoggerFactory.getLogger(MyKeyedProcessFunction.class);

public TypeInformation leftTypeInfo;

public transient ValueState leftState;



public int initQueueSize;

public long emitFrequencyMs;



public MyKeyedProcessFunction() {

initQueueSize = 10;

emitFrequencyMs = 1;

}



@Override

public void open(Configuration conf) {

leftTypeInfo = TypeInformation.of(new TypeHint(){});

leftState = getRuntimeContext().getState(

new ValueStateDescriptor<>("left", leftTypeInfo, null));

}



@Override

public void processElement(Exec leftIn, Context ctx, Collector out) {

try {

ExecQueue eq = leftState.value();

if (eq == null) {

eq = new ExecQueue(10);


ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + emitFrequencyMs);

}

leftState.update(eq);

}

catch (Exception e) {

LOG.error("Exception in processElement1. Key: " + 
ctx.getCurrentKey() + ". " + e + ". trace = " );

for (java.lang.StackTraceElement s:e.getStackTrace())

LOG.error(s.toString());



}

}





@Override

public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) {

try {

ExecQueue eq = leftState.value();


ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + emitFrequencyMs);

}

catch ( Exception e) {

LOG.error("Exception in onTimer. Key: " + ctx.getCurrentKey() + ". 
" + e + ". trace = " );

for (java.lang.StackTraceElement s:e.getStackTrace())

LOG.error(s.toString());

}

}

public class ExecQueue {

public RingBufferExec queue;

public ExecQueue (){}

public ExecQueue (int initSize) {

queue = new RingBufferExec(initSize);

}



public class RingBufferExec {

public Integer size;

public Integer count;

public RingBufferExec(){ }

public RingBufferExec(int sizeIn){

size = sizeIn;

count = 0;

}

}

}

}


From: Dawid Wysakowicz mailto:dwysakow...@apache.org>>
Sent: Thursday, October 8, 2020 6:26 AM
To: Colletta, Edward 

Re: flink on yarn容器异常退出

2020-10-12 Thread Congxian Qiu
Hi
容易异常退出是指 container 退出吗?可以看下 JM/TM log 是否有相应信息,如果没有,可以尝试从 yarn 侧看下日志为什么
container 退出了
Best,
Congxian


caozhen  于2020年10月12日周一 下午6:08写道:

>
> 可以发下 "分配完applicationid后,容器经常异常退出"  产生的错误日志吗?
>
> 或者排查下flink客户端中的错误日志,以及yarn-historyserver里的日志。
>
> 
>
> Dream-底限 wrote
> > hi
> > 我正在使用flink1.11.1 on
> >
> yarn以分离模式运行任务,但在任务提交的时候,任务在分配完applicationid后,容器经常异常退出,先前以为是yarn环境问题,但是在两个集群测都有遇到这种情况,请问这是一个已知的bug吗
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Dynamic file name prefix - StreamingFileSink

2020-10-12 Thread Vijayendra Yadav
Hi Team,

I have tried to assign a dynamic prefix for file name, which contains
datetime components.
*The Problem is Job always takes initial datetime when job first starts and
never refreshes later. *
*How can I get dynamic current datetime in filename at sink time ?*

*.withPartPrefix
(ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))*


https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html

val config = OutputFileConfig
 .builder() .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build()
val sink = StreamingFileSink
 .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
 .withBucketAssigner(new KeyBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
 .build()


Re: Missing annotation in SimpleJdbcConnectionProvider.java ?

2020-10-12 Thread Kenzyme
After careful examination, seems like it should be marked as @Internal since 
this class is located in package 
org.apache.flink.connector.jdbc.internal.connection.

Here is my PR related to this https://github.com/apache/flink/pull/13603 .

Thanks a lot!

Kenzyme Le

‐‐‐ Original Message ‐‐‐
On Monday, October 12th, 2020 at 10:48 PM, Kenzyme  wrote:

> Hi,
>
> I would like to know if class 
> [SimpleJdbcConnectionProvider](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProvider.java).java
>  should be marked as @Internal or @PublicEvolving? Since there's annotation 
> provided, I'm unsure if I should be using this class in my user code.
>
> If one of them is missing, I would like to create a PR for this if that's 
> fine as well.
>
> Thank you for the clarification!
>
> Kenzyme Le

Missing annotation in SimpleJdbcConnectionProvider.java ?

2020-10-12 Thread Kenzyme
Hi,

I would like to know if class 
[SimpleJdbcConnectionProvider](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProvider.java).java
 should be marked as @Internal or @PublicEvolving? Since there's annotation 
provided, I'm unsure if I should be using this class in my user code.

If one of them is missing, I would like to create a PR for this if that's fine 
as well.

Thank you for the clarification!

Kenzyme Le

Re: PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

2020-10-12 Thread Xingbo Huang
Hi,

You can use api to set configuration:
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')

The flink-conf.yaml way will only take effect when submitted through flink
run, and the minicluster way(python xxx.py) will not take effect.

Best,
Xingbo

Sharipov, Rinat  于2020年10月13日周二 上午1:56写道:

> Hi mates !
>
> I'm very new at pyflink and trying to register a custom UDF function using
> python API.
> Currently I faced an issue in both server env and my local IDE
> environment.
>
> When I'm trying to execute the example below I got an error message: *The
> configured Task Off-Heap Memory 0 bytes is less than the least required
> Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
> using the configuration key 'taskmanager.memory.task.off-heap.size*
>
> Of course I've added required property into *flink-conf.yaml *and checked
> that *pyflink-shell.sh *initializes env using specified configuration but
> it doesn't make any sense and I still have an error.
>
> I've also attached my flink-conf.yaml file
>
> Thx for your help !
>
> *Here is an example:*
>
> from pyflink.dataset import ExecutionEnvironment
> from pyflink.table import BatchTableEnvironment, DataTypes
> from pyflink.table.udf import udf
>
>
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> def test_udf(i):
> return i
>
>
> if __name__ == "__main__":
> env = ExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
>
> bt_env = BatchTableEnvironment.create(env)
> bt_env.register_function("test_udf", test_udf)
>
> my_table = bt_env.from_elements(
> [
> ("user-1", "http://url/1;),
> ("user-2", "http://url/2;),
> ("user-1", "http://url/3;),
> ("user-3", "http://url/4;),
> ("user-1", "http://url/3;)
> ],
> [
> "uid", "url"
> ]
> )
>
> my_table_grouped_by_uid = my_table.group_by("uid").select("uid, 
> collect(url) as urls")
> bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)
>
> bt_env.execute_sql("select test_udf(uid) as uid, urls from 
> my_temp_table").print()
>
>
>
>


flink点查时态表支持子查询

2020-10-12 Thread Dream-底限
hi、
现在流表查询外部维表的时候,在有多张维表的情况下会多次查询外部系统,这就导致多次网络请求回传,社区后续会不会支持时态表子查询,就是根据指定的key查询外部系统的时候不再是一次查询一个指定的表,可以点查一个sql子表,这样网络io会小一些


Re: Additional options to S3 Filesystem: Interest?

2020-10-12 Thread Padarn Wilson
Thanks for the feedback. I've created a JIRA here
https://issues.apache.org/jira/browse/FLINK-19589.

@Dan: This indeed would make it easier to set a lifetime property on
objects created by Flink, but actually if you want to apply it to all your
objects for a given bucket you can set bucket wide policies instead. The
reason I want this is that we have a shared bucket and wish to tag
different objects based on which pipeline is producing them.

On Tue, Oct 13, 2020 at 4:13 AM Dan Diephouse  wrote:

> We use the StreamingFileSink. An option to expire files after some time
> period would certainly be welcome. (I could probably figure out a way to do
> this from the S3 admin UI too though)
>
> On Sat, Oct 10, 2020 at 10:45 PM Padarn Wilson  wrote:
>
>> Hi Flink Users,
>>
>> We need to expose some additional options for the s3 hadoop filesystem:
>> Specifically, we want to set object tagging and lifecycle. This would be a
>> fairly easy change and we initially thought to create a new Filsystem with
>> very minor changes to allow this.
>>
>> However then I wondered, would others use this? If it something that is
>> worth raising as a Flink issue and then contributing back upstream.
>>
>> Any others who would like to be able to set object tags for the s3
>> filesystem?
>>
>> Cheers,
>> Padarn
>>
>
>
> --
> Dan Diephouse
> @dandiep
>


FW: NPE in disposing flink sql group window when running flink using ./gradlew shadowJar run

2020-10-12 Thread Dcosta, Agnelo (HBO)

Flink application using kafka topics as source and destination. Using
javaVersion = '1.11'
flinkVersion = '1.11.1'
scalaBinaryVersion ='2.11'
the application is primarily using Flink SQL apis. We have a StatementSet and 
add sql inserts to that set using addInsertSql.
when there are more insert statements (say 10+) running job outside of flink 
cluster using ./gradlew shadowJar run fails with following error
[GroupWindowAggregate(groupBy………) ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of 
stream operator.
java.lang.NullPointerException: null
at 
org.apache.flink.table.runtime.operators.window.WindowOperator.dispose(WindowOperator.java:318)
 ~[flink-table-runtime-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:703)
 [flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:635)
 [flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:542) 
[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
[flink-runtime_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
[flink-runtime_2.11-1.11.1.jar:1.11.1]
at java.lang.Thread.run(Thread.java:834) [?:?]

some approaches that did not work:
Having multiple statement set. This works locally, but not with flink cluster 
with error saying cannot have multiple execute statements.
Changing partition count of kafka topics. No impact.


This e-mail is intended only for the use of the addressees. Any copying, 
forwarding, printing or other use of this e-mail by persons other than the 
addressees is not authorized. This e-mail may contain information that is 
privileged, confidential and exempt from disclosure. If you are not the 
intended recipient, please notify us immediately by return e-mail (including 
the original message in your reply) and then delete and discard all copies of 
the e-mail. Thank you.
HB75


Re: Additional options to S3 Filesystem: Interest?

2020-10-12 Thread Dan Diephouse
We use the StreamingFileSink. An option to expire files after some time
period would certainly be welcome. (I could probably figure out a way to do
this from the S3 admin UI too though)

On Sat, Oct 10, 2020 at 10:45 PM Padarn Wilson  wrote:

> Hi Flink Users,
>
> We need to expose some additional options for the s3 hadoop filesystem:
> Specifically, we want to set object tagging and lifecycle. This would be a
> fairly easy change and we initially thought to create a new Filsystem with
> very minor changes to allow this.
>
> However then I wondered, would others use this? If it something that is
> worth raising as a Flink issue and then contributing back upstream.
>
> Any others who would like to be able to set object tags for the s3
> filesystem?
>
> Cheers,
> Padarn
>


-- 
Dan Diephouse
@dandiep


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-12 Thread Arvid Heise
Hi Yun,

4) Yes, the interaction is not trivial and also I have not completely
thought it through. But in general, I'm currently at the point where I
think that we also need non-checkpoint related events in unaligned
checkpoints. So just keep that in mind, that we might converge anyhow at
this point.

In general, what is helping in this case is to remember that there no
unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we
can completely ignore the problem on how to store and restore output
buffers of a completed task (also important for the next point).

5) I think we are on the same page and I completely agree that for the
MVP/first version, it's completely fine to start and immediately stop. A
tad better would be even to not even start the procession loop.

On Mon, Oct 12, 2020 at 6:18 PM Yun Gao  wrote:

> Hi Arvid,
>
> Very thanks for the insightful comments! I added the responses for this
> issue under the quota:
>
> >> 1) You call the tasks that get the barriers injected leaf nodes, which
> would make the > sinks the root nodes. That is very similar to how graphs
> in relational algebra are labeled. However, I got the feeling that in
> Flink, we rather iterate from sources to sink, making the sources root
> nodes and the sinks the leaf nodes. However, I have no clue how it's done
> in similar cases, so please take that hint cautiously.
>
> >> 2) I'd make the algorithm to find the subtasks iterative and react in
> CheckpointCoordinator. Let's assume that we inject the barrier at all root
> subtasks (initially all sources). So in the iterative algorithm, whenever
> root A finishes, it looks at all connected subtasks B if they have any
> upstream task left. If not B becomes a new root. That would require to only
> touch a part of the job graph, but would require some callback from
> JobManager to CheckpointCoordinator.
>
>
> I think I should have used a bad name of "leaf nodes", in fact I think we
> should have the same thoughts that we start with the source nodes to find
> all the nodes whose precedent nodes are all finished. It would be much
> better to call these nodes (which we would trigger) as "root nodes". I'll
> modify the FLIP to change the names to "root nodes".
>
> >> 2b) We also need to be careful for out-of-sync updates: if the root is
> about to finish, we could send the barrier to it from
> CheckpointCoordinator, but at the time it arrives, the subtask is finished
> already.
>
> Exactly. When the checkpoint triggers a task but found the task is not
> there, it may then further check if the task has been finished, if so, it
> should then re-check its descendants to see if there are new "root nodes"
> to trigger.
>
> >> 3) An implied change is that checkpoints are not aborted anymore at
> EndOfPartition, which is good, but might be explicitly added.
>
> Yes, currently barrier alignment would fail the current checkpoint on
> EndOfPartition, and we would modify the behavior.
>
> >> 4) The interaction between unaligned checkpoint and EndOfPartition is a
> bit ambiguous: What happens when an unaligned checkpoint is started and
> then one input channel contains the EndOfPartition event? From the written
> description, it sounds to me like, we move back to an aligned checkpoint
> for the whole receiving task. However, that is neither easily possible nor
> necessary. Imho it would be enough to also store the EndOfPartition in the
> channel state.
>
>
> Very thanks for the suggestions on this issue and in fact I did stuck on
> it for some time. Previously for me one implementation detail issue is that
> EndOfPartition seems not be able to overtake the previous buffers easily as
> CheckpointBarrier does, otherwise it might start destroying the input
> channels if all EndOfPartitions are received.
>
> Therefore, although we could also persistent the channels with
> EndOfPartition:
>
> 1. Start persisting the channels when CheckpointUnaligner received barrier
> (if not all precendant tasks are finished) or received triggering (if all
> precendant tasks are finished).
>
> 2. The persisting actually stops when onBuffer received EndOfPartition.
>
> After the last channel stopped persisting, CheckpointUnaligner still need
> to wait till all the previous buffers are processed before complete the
> allBarriersReceivedFuture. Therefore it would not be able to accelerate the
> checkpoint in this case.
>
> After some rethinking today currently I think we might inserts some
> additional virtual events into receivedBuffer when received EndOfPartition
> and allows these virtual events to overtake the previous buffers. I'll try
> to double check if it is feasible and let me know if there are also other
> solutions on this issue :).
>
> > 5) I'd expand the recovery section a bit. It would be the first time
> that we recover an incomplete DAG. Afaik the subtasks are deployed before
> the state is recovered, so at some point, the subtasks either need to be
> removed again or maybe we could even 

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-12 Thread Chesnay Schepler
Is there a way for us to change the module (in a reasonable way) that 
would allow users to continue using it?

Is it an API problem, or one of semantics?

On 10/12/2020 4:57 PM, Kostas Kloudas wrote:

Hi Chesnay,

Unfortunately not from what I can see in the code.
This is the reason why I am opening a discussion. I think that if we
supported backwards compatibility, this would have been an easier
process.

Kostas

On Mon, Oct 12, 2020 at 4:32 PM Chesnay Schepler  wrote:

Are older versions of the module compatible with 1.12+?

On 10/12/2020 4:30 PM, Kostas Kloudas wrote:

Hi all,

As the title suggests, this thread is to discuss the removal of the
flink-connector-filesystem module which contains (only) the deprecated
BucketingSink. The BucketingSin is deprecated since FLINK 1.9 [1] in
favor of the relatively recently introduced StreamingFileSink.

For the sake of a clean and more manageable codebase, I propose to
remove this module for release-1.12, but of course we should see first
if there are any usecases that depend on it.

Let's have a fruitful discussion.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-13396





PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

2020-10-12 Thread Sharipov, Rinat
Hi mates !

I'm very new at pyflink and trying to register a custom UDF function using
python API.
Currently I faced an issue in both server env and my local IDE environment.

When I'm trying to execute the example below I got an error message: *The
configured Task Off-Heap Memory 0 bytes is less than the least required
Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
using the configuration key 'taskmanager.memory.task.off-heap.size*

Of course I've added required property into *flink-conf.yaml *and checked
that *pyflink-shell.sh *initializes env using specified configuration but
it doesn't make any sense and I still have an error.

I've also attached my flink-conf.yaml file

Thx for your help !

*Here is an example:*

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def test_udf(i):
return i


if __name__ == "__main__":
env = ExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

bt_env = BatchTableEnvironment.create(env)
bt_env.register_function("test_udf", test_udf)

my_table = bt_env.from_elements(
[
("user-1", "http://url/1;),
("user-2", "http://url/2;),
("user-1", "http://url/3;),
("user-3", "http://url/4;),
("user-1", "http://url/3;)
],
[
"uid", "url"
]
)

my_table_grouped_by_uid = my_table.group_by("uid").select("uid,
collect(url) as urls")
bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)

bt_env.execute_sql("select test_udf(uid) as uid, urls from
my_temp_table").print()


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-12 Thread Yun Gao
Hi Arvid,
Very thanks for the insightful comments! I added the responses for this issue 
under the quota: 
>> 1) You call the tasks that get the barriers injected leaf nodes, which would 
>> make the > sinks the root nodes. That is very similar to how graphs in 
>> relational algebra are labeled. However, I got the feeling that in Flink, we 
>> rather iterate from sources to sink, making the sources root nodes and the 
>> sinks the leaf nodes. However, I have no clue how it's done in similar 
>> cases, so please take that hint cautiously.
>> 2) I'd make the algorithm to find the subtasks iterative and react in 
>> CheckpointCoordinator. Let's assume that we inject the barrier at all root 
>> subtasks (initially all sources). So in the iterative algorithm, whenever 
>> root A finishes, it looks at all connected subtasks B if they have any 
>> upstream task left. If not B becomes a new root. That would require to only 
>> touch a part of the job graph, but would require some callback from 
>> JobManager to CheckpointCoordinator.

I think I should have used a bad name of "leaf nodes", in fact I think we 
should have the same thoughts that we start with the source nodes to find all 
the nodes whose precedent nodes are all finished. It would be much better to 
call these nodes (which we would trigger) as "root nodes". I'll modify the FLIP 
to change the names to "root nodes".
>> 2b) We also need to be careful for out-of-sync updates: if the root is about 
>> to finish, we could send the barrier to it from CheckpointCoordinator, but 
>> at the time it arrives, the subtask is finished already.
Exactly. When the checkpoint triggers a task but found the task is not there, 
it may then further check if the task has been finished, if so, it should then 
re-check its descendants to see if there are new "root nodes" to trigger.
>> 3) An implied change is that checkpoints are not aborted anymore at 
>> EndOfPartition, which is good, but might be explicitly added.
Yes, currently barrier alignment would fail the current checkpoint on 
EndOfPartition, and we would modify the behavior.
>> 4) The interaction between unaligned checkpoint and EndOfPartition is a bit 
>> ambiguous: What happens when an unaligned checkpoint is started and then one 
>> input channel contains the EndOfPartition event? From the written 
>> description, it sounds to me like, we move back to an aligned checkpoint for 
>> the whole receiving task. However, that is neither easily possible nor 
>> necessary. Imho it would be enough to also store the EndOfPartition in the 
>> channel state.

Very thanks for the suggestions on this issue and in fact I did stuck on it for 
some time. Previously for me one implementation detail issue is that 
EndOfPartition seems not be able to overtake the previous buffers easily as 
CheckpointBarrier does, otherwise it might start destroying the input channels 
if all EndOfPartitions are received.
Therefore, although we could also persistent the channels with EndOfPartition:
1. Start persisting the channels when CheckpointUnaligner received barrier (if 
not all precendant tasks are finished) or received triggering (if all 
precendant tasks are finished).
2. The persisting actually stops when onBuffer received EndOfPartition.
After the last channel stopped persisting, CheckpointUnaligner still need to 
wait till all the previous buffers are processed before complete the 
allBarriersReceivedFuture. Therefore it would not be able to accelerate the 
checkpoint in this case.
After some rethinking today currently I think we might inserts some additional 
virtual events into receivedBuffer when received EndOfPartition and allows 
these virtual events to overtake the previous buffers. I'll try to double check 
if it is feasible and let me know if there are also other solutions on this 
issue :). 
> 5) I'd expand the recovery section a bit. It would be the first time that we 
> recover an incomplete DAG. Afaik the subtasks are deployed before the state 
> is recovered, so at some point, the subtasks either need to be removed again 
> or maybe we could even avoid them being created in the first place.
I also agree that finally we should not "restarted" the finished tasks in some 
way. It seems not start it in the first place would be better. We should be 
able to bookkeep additional information in the checkpoint meta about which 
operators are fully finished, and the scheduler could restore the status of 
tasks on restoring from previous checkpoints. It would also requires some 
modification in the task side to support input channels that are finished on 
starting.
But in the first version, I think we might simplify this issue by still restart 
all the tasks, but let the finished sources to exit directly? The new Source 
API would terminate directly since there is no pending splits and the legacy 
sources would be dealt specially by skipped execution if the source operator is 
fully finished before. We would be able to turn to the 

Re:如何获取flink webUI上面的DAG图

2020-10-12 Thread hailongwang
Hi,
你是想要自己做一个产品,将图显示在Web上?我们是只拿 DAG 中 json 值,然后前端进行处理的。
希望能帮助到你~


Best,
Hailong Wang
在 2020-10-12 18:15:36,"丁浩浩" <18579099...@163.com> 写道:
>我想要获取到flink webUI上面的DAG图,有什么办法能够获取到吗?


Re:Re: Flink 任务提交问题

2020-10-12 Thread hailongwang


是的,具体代码逻辑在YarnClusterDescriptor#startAppMaster,会一直检测app state。
如果需要的话,可以自己增加超时判断(在旧版本中有这个逻辑,比如1.4.2)。




Best,
Hailong Wang


在 2020-10-12 17:17:44,"caozhen"  写道:
>
>是的,flink on yarn启动时申请的container资源不够,会等待,直到有资源。
>
>---
>
>
>guaishushu1...@163.com wrote
>> CliFrontend 向yarn上提交任务会因为资源不足等原因,导致任务提交进程一直卡着,直到有资源释放为止?
>> 
>> 
>
>> guaishushu1103@
>
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 如何为每个flink任务分别设置metrics的reporter

2020-10-12 Thread 王 小宝
我这边是为每个流任务单独指定了一个配置文件目录 不知道可否达到你的需求

发自我的iPhone

> 在 2020年10月12日,18:18,xiao cai  写道:
> 
> Hi:
> 已知的设置metrics reporter的方式是在conf/flink-conf.yaml中,如果想要为每个任务分别设置不同的metrics 
> reporter或者设置不同的参数,比如设置prometheus pushgateway的多个自定义的k=v,该如何设置呢?
> 
> 
> Best xiao.


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-12 Thread Kostas Kloudas
Hi Chesnay,

Unfortunately not from what I can see in the code.
This is the reason why I am opening a discussion. I think that if we
supported backwards compatibility, this would have been an easier
process.

Kostas

On Mon, Oct 12, 2020 at 4:32 PM Chesnay Schepler  wrote:
>
> Are older versions of the module compatible with 1.12+?
>
> On 10/12/2020 4:30 PM, Kostas Kloudas wrote:
> > Hi all,
> >
> > As the title suggests, this thread is to discuss the removal of the
> > flink-connector-filesystem module which contains (only) the deprecated
> > BucketingSink. The BucketingSin is deprecated since FLINK 1.9 [1] in
> > favor of the relatively recently introduced StreamingFileSink.
> >
> > For the sake of a clean and more manageable codebase, I propose to
> > remove this module for release-1.12, but of course we should see first
> > if there are any usecases that depend on it.
> >
> > Let's have a fruitful discussion.
> >
> > Cheers,
> > Kostas
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-13396
> >
>


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-12 Thread Chesnay Schepler

Are older versions of the module compatible with 1.12+?

On 10/12/2020 4:30 PM, Kostas Kloudas wrote:

Hi all,

As the title suggests, this thread is to discuss the removal of the
flink-connector-filesystem module which contains (only) the deprecated
BucketingSink. The BucketingSin is deprecated since FLINK 1.9 [1] in
favor of the relatively recently introduced StreamingFileSink.

For the sake of a clean and more manageable codebase, I propose to
remove this module for release-1.12, but of course we should see first
if there are any usecases that depend on it.

Let's have a fruitful discussion.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-13396





[DISCUSS] Remove flink-connector-filesystem module.

2020-10-12 Thread Kostas Kloudas
Hi all,

As the title suggests, this thread is to discuss the removal of the
flink-connector-filesystem module which contains (only) the deprecated
BucketingSink. The BucketingSin is deprecated since FLINK 1.9 [1] in
favor of the relatively recently introduced StreamingFileSink.

For the sake of a clean and more manageable codebase, I propose to
remove this module for release-1.12, but of course we should see first
if there are any usecases that depend on it.

Let's have a fruitful discussion.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-13396


Re: [PyFlink] update udf functions on the fly

2020-10-12 Thread Sharipov, Rinat
Hi Dian, thx for your reply !

I was wondering to replace UDF on the fly from Flink, of course I'm pretty
sure that it's possible to implement update logic directly in Python, thx
for idea

Regards,
Rinat

пн, 12 окт. 2020 г. в 14:20, Dian Fu :

> Hi Rinat,
>
> Do you want to replace the UDFs with new ones on the fly or just want to
> update the model which could be seen as instance variables inside the UDF?
>
> For the former case, it's not supported AFAIK.
> For the latter case, I think you could just update the model in the UDF
> periodically or according to some custom strategy. It's the behavior of the
> UDF.
>
> Regards,
> Dian
>
> 在 2020年10月12日,下午5:51,Sharipov, Rinat  写道:
>
> Hi Arvid, thx for your reply.
>
> We are already using the approach with control streams to propagate
> business rules through our data-pipeline.
>
> Because all our models are powered by Python, I'm going to use Table API
> and register UDF functions, where each UDF is a separate model.
>
> So my question is - can I update the UDF function on the fly without a job
> restart ?
> Because new model versions become available on a daily basis and we should
> use them as soon as possible.
>
> Thx !
>
>
>
>
> пн, 12 окт. 2020 г. в 11:32, Arvid Heise :
>
>> Hi Rinat,
>>
>> Which API are you using? If you use datastream API, the common way to
>> simulate side inputs (which is what you need) is to use a broadcast. There
>> is an example on SO [1].
>>
>> [1]
>> https://stackoverflow.com/questions/54667508/how-to-unit-test-broadcastprocessfunction-in-flink-when-processelement-depends-o
>>
>> On Sat, Oct 10, 2020 at 7:12 PM Sharipov, Rinat 
>> wrote:
>>
>>> Hi mates !
>>>
>>> I'm in the beginning of the road of building a recommendation pipeline
>>> on top of Flink.
>>> I'm going to register a list of UDF python functions on job
>>> startups where each UDF is an ML model.
>>>
>>> Over time new model versions appear in the ML registry and I would like
>>> to update my UDF functions on the fly without need to restart the whole job.
>>> Could you tell me, whether it's possible or not ? Maybe the community
>>> can give advice on how such tasks can be solved using Flink and what other
>>> approaches exist.
>>>
>>> Thanks a lot for your help and advice !
>>>
>>>
>>>
>>
>> --
>> Arvid Heise | Senior Java Developer
>> 
>>
>> Follow us @VervericaData
>> --
>> Join Flink Forward  - The Apache Flink
>> Conference
>> Stream Processing | Event Driven | Real Time
>> --
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>


回复: why we need keyed state and operate state when we already have checkpoint?

2020-10-12 Thread 大森林
Thanks for your replies.
When I use no state-relevant code in my program,the checkingpoint can be saved 
and resumed.❶


So then why we needKeyed State/Operator State/Stateful Function?❷
"the operators are reset to the time of the respective checkpoint."
We already have met the requirement:"resume from checkpoint(last state of each 
operator which store the result)"❶,
why we still need❷?
Thanks for your help~!






--原始邮件--
发件人:
"Arvid Heise"   
 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#definitions


On Wed, Oct 7, 2020 at 6:51 AM 大森林 

flink并行度 slot taskmanager相关问题

2020-10-12 Thread zjfpla...@hotmail.com
1.flink任务各个算子的并行度一般怎么设计?例如map并行度设为多少,source并行度设为多少,这个有没有一个算法。
2.taskmanager的slot用完才会分配task到下一个taskmanager,这个设计初衷是如下原因吗?
摘自官网:
 通过调整 slot 的数量,用户可以决定 subtasks 的隔离方式。每个 TaskManager 有一个 slot 意味着每组 task 在一个单独的 
JVM 中运行(例如,在一个单独的容器中启动)。拥有多个 slots 意味着多个 subtasks 共享同一个 JVM。 Tasks 在同一个 JVM 中共享 
TCP 连接(通过多路复用技术)和心跳信息(heartbeat messages)。它们还可能共享数据集和数据结构,从而降低每个 task 的开销。




zjfpla...@hotmail.com


Re: 关于stream注册成表的数据存储时长问题

2020-10-12 Thread caozhen
我理解:

对于print,map 等无状态操作,不存储数据。
对于window 这种有状态操作,只存储窗口内的数据。
对于groupby 这种有状态操作,随着key越多,存储的数据越多,默认不清理,可以配置清理策略。

---


  我的数据是接的kafka数据源,接到数据后注册成表,我想知道通过这种方式创建的表,表里的数据会一直追加吗?
是否会一直存在导致占用内存越来越大的问题???如何清理过期数据???


注册表代码如下:
//获取订单回调kafka数据
DataStreamSource




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于stream注册成表的数据存储时长问题

2020-10-12 Thread caozhen
我理解:

对于print,map 等无状态操作,不存储数据。
对于window 这种有状态操作,只存储窗口内的数据。
对于groupby 这种有状态操作,随着key越多,存储的数据越多,默认不清理,可以配置清理策略。

---


  我的数据是接的kafka数据源,接到数据后注册成表,我想知道通过这种方式创建的表,表里的数据会一直追加吗?
是否会一直存在导致占用内存越来越大的问题???如何清理过期数据???


注册表代码如下:
//获取订单回调kafka数据
DataStreamSource




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: [PyFlink] update udf functions on the fly

2020-10-12 Thread Dian Fu
Hi Rinat,

Do you want to replace the UDFs with new ones on the fly or just want to update 
the model which could be seen as instance variables inside the UDF?

For the former case, it's not supported AFAIK.
For the latter case, I think you could just update the model in the UDF 
periodically or according to some custom strategy. It's the behavior of the UDF.

Regards,
Dian

> 在 2020年10月12日,下午5:51,Sharipov, Rinat  写道:
> 
> Hi Arvid, thx for your reply.
> 
> We are already using the approach with control streams to propagate business 
> rules through our data-pipeline.
> 
> Because all our models are powered by Python, I'm going to use Table API and 
> register UDF functions, where each UDF is a separate model.
> 
> So my question is - can I update the UDF function on the fly without a job 
> restart ? 
> Because new model versions become available on a daily basis and we should 
> use them as soon as possible.
> 
> Thx !
> 
> 
> 
> 
> пн, 12 окт. 2020 г. в 11:32, Arvid Heise  >:
> Hi Rinat,
> 
> Which API are you using? If you use datastream API, the common way to 
> simulate side inputs (which is what you need) is to use a broadcast. There is 
> an example on SO [1].
> 
> [1] 
> https://stackoverflow.com/questions/54667508/how-to-unit-test-broadcastprocessfunction-in-flink-when-processelement-depends-o
>  
> 
> On Sat, Oct 10, 2020 at 7:12 PM Sharipov, Rinat  > wrote:
> Hi mates !
> 
> I'm in the beginning of the road of building a recommendation pipeline on top 
> of Flink.
> I'm going to register a list of UDF python functions on job startups where 
> each UDF is an ML model.
> 
> Over time new model versions appear in the ML registry and I would like to 
> update my UDF functions on the fly without need to restart the whole job.
> Could you tell me, whether it's possible or not ? Maybe the community can 
> give advice on how such tasks can be solved using Flink and what other 
> approaches exist.
> 
> Thanks a lot for your help and advice !
> 
> 
> 
> 
> -- 
> Arvid Heise | Senior Java Developer
>  
> Follow us @VervericaData
> --
> Join Flink Forward  - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji 
> (Toni) Cheng



如何获取flink webUI上面的DAG图

2020-10-12 Thread 丁浩浩
我想要获取到flink webUI上面的DAG图,有什么办法能够获取到吗?

回复:如何为每个flink任务分别设置metrics的reporter

2020-10-12 Thread 熊云昆
每个job启动时候单独读取配置就可以了吧




| |
熊云昆
|
|
邮箱:xiongyun...@163.com
|

签名由 网易邮箱大师 定制

在2020年10月12日 18:17,xiao cai 写道:
Hi:
已知的设置metrics reporter的方式是在conf/flink-conf.yaml中,如果想要为每个任务分别设置不同的metrics 
reporter或者设置不同的参数,比如设置prometheus pushgateway的多个自定义的k=v,该如何设置呢?


Best xiao.

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-12 Thread Timo Walther

Hi Austin,

your explanation for the KeyedProcessFunction implementation sounds good 
to me. Using the time and state primitives for this task will make the 
implementation more explicit but also more readable.


Let me know if you could solve your use case.

Regards,
Timo


On 09.10.20 17:27, Austin Cawley-Edwards wrote:

Hey Timo,

Hah, that's a fair point about using time. I guess I should update my 
statement to "as a user, I don't want to worry about /manually managing/ 
time".


That's a nice suggestion with the KeyedProcessFunction and no windows, 
I'll give that a shot. If I don't want to emit any duplicates, I'd have 
to essentially buffer the "last seen duplicate" for each key in that 
process function until the MAX_WATERMARK is sent through though, right? 
I could emit early results if I assume the max number of possible 
duplicates, but for records with no duplicates, I'd have to wait until 
no more records are coming -- am I missing something?


Thanks so much,
Austin

On Fri, Oct 9, 2020 at 10:44 AM Timo Walther > wrote:


Hi Austin,

if you don't want to worry about time at all, you should probably not
use any windows because those are a time-based operation.

A solution that would look a bit nicer could be to use a pure
KeyedProcessFunction and implement the deduplication logic without
reusing windows. In ProcessFunctions you can register an event-time
timer. The timer would be triggered by the MAX_WATERMARK when the
pipeline shuts down even without having a timestamp assigned in the
StreamRecord. Watermark will leave SQL also without a time attribute as
far as I know.

Regards,
Timo


On 08.10.20 17:38, Austin Cawley-Edwards wrote:
 > Hey Timo,
 >
 > Sorry for the delayed reply. I'm using the Blink planner and using
 > non-time-based joins. I've got an example repo here that shows my
query/
 > setup [1]. It's got the manual timestamp assignment commented out
for
 > now, but that does indeed solve the issue.
 >
 > I'd really like to not worry about time at all in this job hah -- I
 > started just using processing time, but Till pointed out that
processing
 > time timers won't be fired when input ends, which is the case for
this
 > streaming job processing CSV files, so I should be using event time.
 > With that suggestion, I switched to ingestion time, where I then
 > discovered the issue converting from SQL to data stream.
 >
 > IMO, as a user manually assigning timestamps on conversion makes
sense
 > if you're using event time and already handling time attributes
 > yourself, but for ingestion time you really don't want to think
about
 > time at all, which is why it might make sense to propigate the
 > automatically assigned timestamps in that case. Though not sure how
 > difficult that would be. Let me know what you think!
 >
 >
 > Best + thanks again,
 > Austin
 >
 > [1]: https://github.com/austince/flink-1.10-sql-windowing-error
 >
 > On Mon, Oct 5, 2020 at 4:24 AM Timo Walther mailto:twal...@apache.org>
 > >> wrote:
 >
 >     Btw which planner are you using?
 >
 >     Regards,
 >     Timo
 >
 >     On 05.10.20 10:23, Timo Walther wrote:
 >      > Hi Austin,
 >      >
 >      > could you share some details of your SQL query with us? The
 >     reason why
 >      > I'm asking is because I guess that the rowtime field is
not inserted
 >      > into the `StreamRecord` of DataStream API. The rowtime
field is only
 >      > inserted if there is a single field in the output of the query
 >     that is a
 >      > valid "time attribute".
 >      >
 >      > Esp. after non-time-based joins and aggregations, time
attributes
 >     loose
 >      > there properties and become regular timestamps. Because
timestamp
 >     and
 >      > watermarks might have diverged.
 >      >
 >      > If you know what you're doing, you can also assign the
timestamp
 >      > manually after
`toRetractStream.assignTimestampAndWatermarks` and
 >      > reinsert the field into the stream record. But before you
do that, I
 >      > think it is better to share more information about the
query with us.
 >      >
 >      > I hope this helps.
 >      >
 >      > Regards,
 >      > Timo
 >      >
 >      >
 >      >
 >      > On 05.10.20 09:25, Till Rohrmann wrote:
 >      >> Hi Austin,
 >      >>
 >      >> thanks for offering to help. First I would suggest asking
Timo
 >     whether
 >      >> this is an aspect which is still missing or whether we
 >     overlooked it.
 >      >> Based on that we can then take the next steps.
 >      >>
 > 

Re: 如何为每个flink任务分别设置metrics的reporter

2020-10-12 Thread caozhen

可以试下这个方式能不能生效:在启动每个作业时,通过-D k=v 参数来设置这个作业的metrics。


---

xiao cai wrote
> Hi:
> 已知的设置metrics reporter的方式是在conf/flink-conf.yaml中,如果想要为每个任务分别设置不同的metrics
> reporter或者设置不同的参数,比如设置prometheus pushgateway的多个自定义的k=v,该如何设置呢?
> 
> 
> Best xiao.





--
Sent from: http://apache-flink.147419.n8.nabble.com/

如何为每个flink任务分别设置metrics的reporter

2020-10-12 Thread xiao cai
Hi:
已知的设置metrics reporter的方式是在conf/flink-conf.yaml中,如果想要为每个任务分别设置不同的metrics 
reporter或者设置不同的参数,比如设置prometheus pushgateway的多个自定义的k=v,该如何设置呢?


Best xiao.

Re: Flink 任务提交问题

2020-10-12 Thread caozhen

是的,flink on yarn启动时申请的container资源不够,会等待,直到有资源。

---


guaishushu1...@163.com wrote
> CliFrontend 向yarn上提交任务会因为资源不足等原因,导致任务提交进程一直卡着,直到有资源释放为止?
> 
> 

> guaishushu1103@





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink on yarn容器异常退出

2020-10-12 Thread caozhen

可以发下 "分配完applicationid后,容器经常异常退出"  产生的错误日志吗?

或者排查下flink客户端中的错误日志,以及yarn-historyserver里的日志。



Dream-底限 wrote
> hi
> 我正在使用flink1.11.1 on
> yarn以分离模式运行任务,但在任务提交的时候,任务在分配完applicationid后,容器经常异常退出,先前以为是yarn环境问题,但是在两个集群测都有遇到这种情况,请问这是一个已知的bug吗





--
Sent from: http://apache-flink.147419.n8.nabble.com/

答复: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-10-12 Thread 范超
要不换个kafka的topic sink测试一下。。我觉得可能是kafka那头的问题,新手只能这样子猜一下。。

-邮件原件-
发件人: Yang Peng [mailto:yangpengklf...@gmail.com] 
发送时间: 2020年9月30日 星期三 18:00
收件人: user-zh 
主题: Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

感谢回复,这个任务重启了之后看不到这个in/out指标数据, 我们能查到这个任务依赖的redis的连接查询次数也降低了,好像是任务假死一样
一直在消费数据但是就是不处理数据 没有和redis进行交互

tison  于2020年9月30日周三 下午5:34写道:

> 那有审计/监控的话看下每个节点的 in/out 记录呗,总能看到是哪一步跌了...
>
> 照你现在提供的信息听起来一切正常那就是业务逻辑本身输出少了,不然总得有哪里不一样。如果只有 sink 跌了,那就是 sink
> 有问题,比如可能依赖了外部环境或者内部积累错误等等。
>
> Best,
> tison.
>
>
> Yang Peng  于2020年9月30日周三 下午5:26写道:
>
> > 感谢回复,是的,之前确实怀疑是业务逻辑导致的
> > 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题
> >
> > tison  于2020年9月30日周三 下午2:33写道:
> >
> > > Hi Yang,
> > >
> > > 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么?
> > >
> > > 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Yang Peng  于2020年9月30日周三 上午10:29写道:
> > >
> > > > 感谢回复,我们看了consumer的lag很小
> > > > 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的
> > > > 而且任务重启了没法jstack判断了
> > > >
> > > > hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道:
> > > >
> > > > >
> > > > >
> > > > >
> > > > > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
> > > > > 也可以 jstack 采下堆栈看下,GC等看下。
> > > > > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
> > > > > Best,
> > > > > Hailong Wang
> > > > > 在 2020-09-29 20:06:50,"Yang Peng"  写道:
> > > > >
> > > > >
> > > >
> > >
> >
> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
> > > > > >flinkkafkaconsumer消费的并行度也是90
> > 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
> > > > > >
> > > > > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
> > > > > >
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> Hi Yang Peng:
> > > > > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
> > > > > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
> > > > > >> 2. Source 的序列化耗时严重,导致拉取变慢。
> > > > > >> 可以尝试着扩kafka 分区,加大Source并发看下。
> > > > > >> Best,
> > > > > >> Hailong Wang
> > > > > >>
> > > > > >> 在 2020-09-29 19:44:44,"Yang Peng" 
> 写道:
> > > > > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
> > > > > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
> > > > > >> >kafka消费没有积压,也没有反压,
> > > > > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
> > > > > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
> > > > > >>
> > > > >
> > > >
> > >
> >
>


Re: Flink Kerberos认证问题

2020-10-12 Thread caozhen
"认证的kafka是BBB.keytab"  这个是怎么设置的呢?是自己实现的kafkaSink嘛?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: [PyFlink] update udf functions on the fly

2020-10-12 Thread Sharipov, Rinat
Hi Arvid, thx for your reply.

We are already using the approach with control streams to propagate
business rules through our data-pipeline.

Because all our models are powered by Python, I'm going to use Table API
and register UDF functions, where each UDF is a separate model.

So my question is - can I update the UDF function on the fly without a job
restart ?
Because new model versions become available on a daily basis and we should
use them as soon as possible.

Thx !




пн, 12 окт. 2020 г. в 11:32, Arvid Heise :

> Hi Rinat,
>
> Which API are you using? If you use datastream API, the common way to
> simulate side inputs (which is what you need) is to use a broadcast. There
> is an example on SO [1].
>
> [1]
> https://stackoverflow.com/questions/54667508/how-to-unit-test-broadcastprocessfunction-in-flink-when-processelement-depends-o
>
> On Sat, Oct 10, 2020 at 7:12 PM Sharipov, Rinat 
> wrote:
>
>> Hi mates !
>>
>> I'm in the beginning of the road of building a recommendation pipeline on
>> top of Flink.
>> I'm going to register a list of UDF python functions on job
>> startups where each UDF is an ML model.
>>
>> Over time new model versions appear in the ML registry and I would like
>> to update my UDF functions on the fly without need to restart the whole job.
>> Could you tell me, whether it's possible or not ? Maybe the community can
>> give advice on how such tasks can be solved using Flink and what other
>> approaches exist.
>>
>> Thanks a lot for your help and advice !
>>
>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: restoring from externalized incremental rocksdb checkpoint?

2020-10-12 Thread Congxian Qiu
Hi  Jeff
   Sorry for the late reply.  You can only restore the checkpoint in which
there is a _metadata in the chk-xxx directory, if there is not _metadata in
the chk-xxx directory, that means the chk-xxx is not complete, you can't
restore from it.

Best,
Congxian


Jeffrey Martin  于2020年9月15日周二 下午2:18写道:

> Thanks for the quick reply Congxian.
>
> The non-empty chk-N directories I looked at contained only files whose
> names are UUIDs. Nothing named _metadata (unless HDFS hides files that
> start with an underscore?).
>
> Just to be clear though -- I should expect a metadata file when using
> incremental checkpoints?
>
> On Mon, Sep 14, 2020 at 10:46 PM Congxian Qiu 
> wrote:
>
>> Hi Jeff
>>You can restore from retained checkpoint such as[1] `bin/flink run -s
>> :checkpointMetaDataPath [:runArgs]` ,  you may find the metadata in the
>> `chk-xxx` directory[2]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html#directory-structure
>> Best,
>> Congxian
>>
>>
>> Jeffrey Martin  于2020年9月15日周二 下午1:30写道:
>>
>>> Hi,
>>>
>>> My job on Flink 1.10 uses RocksDB with incremental checkpointing
>>> enabled. The checkpoints are retained on cancellation.
>>>
>>> How do I resume from the retained checkpoint after cancellation (e.g.,
>>> when upgrading the job binary)? Docs say to use the checkpoint or savepoint
>>> metadata file, but AFAICT there's no metadata file in HDFS in the various
>>> directories under "$checkpointsDir/snapshots/$jobID",
>>>
>>> Thanks,
>>>
>>> Jeff Martin
>>>
>>>
>>>
>>
>>
>
>


Re: Re:Re: 关于flink-sql Join Temporal Tables join 维表问题

2020-10-12 Thread caozhen
那这样的话,用inner join是不是可以,保证订单表中的员工都在员工维表里,就能统计到所有员工在今天产生的所有订单量。

---


夜思流年梦 wrote
> 是这样子的,比如要统计所有员工的今天的订单数量,如果是订单表left join 员工表的话,那么今天没有订单数量的就无法出现在结果表集合中;
> 把员工表放在左边left join 订单表的话那么就是所有员工的今天订单数量都会 出现
> 
> 
> 
> 
> 
> 在 2020-10-12 15:17:07,"caozhen" 

> caozhen1937@

>  写道:
>>
>>我理解这个场景下  员工维表在右边没啥问题。
>>
>>join过程中需要去员工维表拿哪些字段?
>>
>>
>>
>>夜思流年梦 wrote
>>> 现在有一个场景: 一个员工维表,一个订单表(监听mysql binlog的一个流表),想实时计算出所有员工的订单数;
>>> 目前flink-sql 支持Join Temporal Tables  ,但是官方文档上是这么说的:仅支持带有处理时间的 temporal
>>> tables 的 inner 和 left join。
>>> 而这个场景必须是维表在左边,但实际情况是维表在左边无法进行left join :会报错:ClassCastException:
>>> org.apache.calcite.rel.logical.LogicalProject cannot be cast to
>>> org.apache.calcite.rel.core.TableScan;(如果是流表在左边,然后用temporal join
>>> 一个维表那么没有问题,即:left join 维表 FOR SYSTEM_TIME AS OF proctime)
>>> 想问下各位,碰到这类情况,大家是怎么处理的
>>
>>
>>
>>
>>
>>--
>>Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: Flink on K8s statebackend 配置

2020-10-12 Thread superainbower
Hi
感谢回复,我去看下


| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制


在2020年10月12日 17:09,Congxian Qiu 写道:
Hi
从错误日志看,应该是 filesystem 相关的配置(或者 jar 包)有问题,可以参考下这个邮件列表[1]看看能否解决你的问题

[1]
http://apache-flink.147419.n8.nabble.com/Flink-1-11-1-on-k8s-hadoop-td5779.html#a5834
Best,
Congxian


superainbower  于2020年9月30日周三 下午3:04写道:

补充一下,我的错误日志
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Hadoop is not in the classpath/dependencies.
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 'hdfs'. The scheme
is not directly supported by Flink and no Hadoop file system to support
this scheme could be loaded. For a full list of supported file systems,
please seehttps://
ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


应该是没有Hadoop的路径,这个在K8s下面 该怎么去配置呢
| |


superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制


在2020年09月30日 14:33,superainbower 写道:
Hi,all
请教下,哪个朋友知道Flink on K8s上做 statebackend 配置,除了将下列配置写到flink-conf.yml里,还需要作哪些工作?
state.backend: rocksdb
state.checkpoints.dir: hdfs://master:8020/flink/checkpoints
state.savepoints.dir: hdfs://master:8020/flink/savepoints
state.backend.incremental: true


| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制




Re: Flink Kuberntes Libraries

2020-10-12 Thread Till Rohrmann
Hi Superainbower,

could you share the complete logs with us? They contain which Flink version
you are using and also the classpath you are starting the JVM with. Have
you tried whether the same problem occurs with the latest Flink version?

Cheers,
Till

On Mon, Oct 12, 2020 at 10:32 AM superainbower 
wrote:

> Hi Till,
> Could u tell me how to configure HDFS as statebackend when I deploy flink
> on k8s?
> I try to add the following to flink-conf.yaml
>
> state.backend: rocksdb
> state.checkpoints.dir: hdfs://slave2:8020/flink/checkpoints
> state.savepoints.dir: hdfs://slave2:8020/flink/savepoints
> state.backend.incremental: true
>
> And add flink-shaded-hadoop2-2.8.3-1.8.3.jar to /opt/flink/lib
>
> But It doesn’t work and I got this error logs
>
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'hdfs'. The scheme
> is not directly supported by Flink and no Hadoop file system to support
> this scheme could be loaded. For a full list of supported file systems,
> please see
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
>
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in
> the classpath, or some classes are missing from the classpath
>
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.flink.runtime.util.HadoopUtils
> On 10/09/2020 22:13, Till Rohrmann  wrote:
>
> Hi Saksham,
>
> if you want to extend the Flink Docker image you can find here more
> details [1].
>
> If you want to include the library in your user jar, then you have to add
> the library as a dependency to your pom.xml file and enable the shade
> plugin for building an uber jar [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html#advanced-customization
> [2]
> https://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html
>
> Cheers,
> Till
>
> On Fri, Oct 9, 2020 at 3:22 PM saksham sapra 
> wrote:
>
>> Thanks Till for helping out,
>>
>> The way you suggested, is it possible to copy libs which is in D
>> directory to FLINK_HOME/libs. I tried to run a copy command : copy
>> D:/data/libs to FLINK_HOME/libs and it gets copied but i dont how can i
>> check where it gets copied and this libs is taken by flink?
>>
>>
>> Thanks,
>> Saksham Sapra
>>
>> On Wed, Oct 7, 2020 at 9:40 PM Till Rohrmann 
>> wrote:
>>
>>> HI Saksham,
>>>
>>> the easiest approach would probably be to include the required libraries
>>> in your user code jar which you submit to the cluster. Using maven's shade
>>> plugin should help with this task. Alternatively, you could also create a
>>> custom Flink Docker image where you add the required libraries to the
>>> FLINK_HOME/libs directory. This would however mean that every job you
>>> submit to the Flink cluster would see these libraries in the system class
>>> path.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Oct 7, 2020 at 2:08 PM saksham sapra 
>>> wrote:
>>>
 Hi ,

 i have made some configuration using this link page :
 https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html
 .
 and i am able to run flink on UI , but i need to submit a job using :
 http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:webui/proxy/#/submit
 through POstman, and i have some libraries which in local i can add in libs
 folder but in this how can i add my libraries so that it works properly.

 [image: image.png]

>>>


Re: Flink on K8s statebackend 配置

2020-10-12 Thread Congxian Qiu
Hi
  从错误日志看,应该是 filesystem 相关的配置(或者 jar 包)有问题,可以参考下这个邮件列表[1]看看能否解决你的问题

[1]
http://apache-flink.147419.n8.nabble.com/Flink-1-11-1-on-k8s-hadoop-td5779.html#a5834
Best,
Congxian


superainbower  于2020年9月30日周三 下午3:04写道:

> 补充一下,我的错误日志
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Hadoop is not in the classpath/dependencies.
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'hdfs'. The scheme
> is not directly supported by Flink and no Hadoop file system to support
> this scheme could be loaded. For a full list of supported file systems,
> please seehttps://
> ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
>
>
> 应该是没有Hadoop的路径,这个在K8s下面 该怎么去配置呢
> | |
>
>
> superainbower
> |
> |
> superainbo...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年09月30日 14:33,superainbower 写道:
> Hi,all
> 请教下,哪个朋友知道Flink on K8s上做 statebackend 配置,除了将下列配置写到flink-conf.yml里,还需要作哪些工作?
> state.backend: rocksdb
> state.checkpoints.dir: hdfs://master:8020/flink/checkpoints
> state.savepoints.dir: hdfs://master:8020/flink/savepoints
> state.backend.incremental: true
>
>
> | |
> superainbower
> |
> |
> superainbo...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re: Flink 1.10.1 checkpoint失败问题

2020-10-12 Thread Congxian Qiu
Hi, @Storm 请问你用的是 flink 是哪个版本,然后栈是什么呢?可以把相关性信息回复到这里,可以一起看看是啥问题

Best,
Congxian


大森林  于2020年10月10日周六 下午1:05写道:

> 我这边是老版本的jdk8,和jdk261没啥关系的
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> storm_h_2...@163.com;
> 发送时间:2020年10月10日(星期六) 上午9:03
> 收件人:"user-zh"
> 主题:Re: Flink 1.10.1 checkpoint失败问题
>
>
>
> 尝试了将jdk升级到了261,报错依然还有。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: 关于flink-sql Join Temporal Tables join 维表问题

2020-10-12 Thread 夜思流年梦



你好,我最开始也考虑用双流join,但是双流join 
就会碰到一个问题,就是结果集只会包含今天有订单的员工数据,那么没有订单的员工数据是不会体现到结果集的。主要是需要所有员工今天的订单数量;














在 2020-10-12 15:37:51,"Jark Wu"  写道:
>我理解楼主的场景不是 temporal join 的场景,而是双流 join
>的场景,因为任何一条流的变化,都希望触发对结果的更新,所以讲员工作为右边维度表是不行的。
>
>如果是我理解的这样的话,你可以用 flink-cdc-connectors [1] 去对接员工和订单两个 binlog 流,然后直接
>join,然后聚合订单数。伪代码如下:
>
>create table users (
>  user_id bigint,
>  ...
>) with (
>  connector = mysql-cdc
>  ...
>);
>
>create table orders (
>  order_id bigint,
>  user_id bigint,
>  ...
>) with (
>  connector = mysql-cdc
>  ...
>);
>
>select user_id, count(*) as order_num
>from (select * from users left join orders on users.user_id =
>orders.user_id)
>group by user_id;
>
>
>[1]: https://github.com/ververica/flink-cdc-connectors
>
>On Mon, 12 Oct 2020 at 15:17, caozhen  wrote:
>
>>
>> 我理解这个场景下  员工维表在右边没啥问题。
>>
>> join过程中需要去员工维表拿哪些字段?
>>
>>
>>
>> 夜思流年梦 wrote
>> > 现在有一个场景: 一个员工维表,一个订单表(监听mysql binlog的一个流表),想实时计算出所有员工的订单数;
>> > 目前flink-sql 支持Join Temporal Tables  ,但是官方文档上是这么说的:仅支持带有处理时间的 temporal
>> > tables 的 inner 和 left join。
>> > 而这个场景必须是维表在左边,但实际情况是维表在左边无法进行left join :会报错:ClassCastException:
>> > org.apache.calcite.rel.logical.LogicalProject cannot be cast to
>> > org.apache.calcite.rel.core.TableScan;(如果是流表在左边,然后用temporal join
>> > 一个维表那么没有问题,即:left join 维表 FOR SYSTEM_TIME AS OF proctime)
>> > 想问下各位,碰到这类情况,大家是怎么处理的
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-12 Thread DONG, Weike
Hi community,

I have uploaded the log files of JobManager and TaskManager-1-1 (one of the
50 TaskManagers) with DEBUG log level and default Flink configuration, and
it clearly shows that TaskManager failed to register with JobManager after
10 attempts.

Here is the link:

JobManager:
https://gist.github.com/kylemeow/740c470d9b5a1ab3552376193920adce

TaskManager-1-1:
https://gist.github.com/kylemeow/41b9a8fe91975875c40afaf58276c2fe

Thanks : )

Best regards,
Weike


On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike  wrote:

> Hi community,
>
> Recently we have noticed a strange behavior for Flink jobs on Kubernetes
> per-job mode: when the parallelism increases, the time it takes for the
> TaskManagers to register with *JobManager *becomes abnormally long (for a
> task with parallelism of 50, it could take 60 ~ 120 seconds or even longer
> for the registration attempt), and usually more than 10 attempts are needed
> to finish this registration.
>
> Because of this, we could not submit a job requiring more than 20 slots
> with the default configuration, as the TaskManager would say:
>
>
>> Registration at JobManager 
>> (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2)
>> attempt 9 timed out after 25600 ms
>
> Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The
>> slot 60d5277e138a94fb73fc6691557001e0 has timed out.
>
> Free slot TaskSlot(index:0, state:ALLOCATED, resource profile:
>> ResourceProfile{cpuCores=1., taskHeapMemory=1.425gb
>> (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb
>> (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)},
>> allocationId: 60d5277e138a94fb73fc6691557001e0, jobId:
>> 493cd86e389ccc8f2887e1222903b5ce).
>> java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed
>> out.
>
>
> In order to cope with this issue, we have to change the below
> configuration parameters:
>
>>
>> # Prevent "Could not allocate the required slot within slot request
>> timeout. Please make sure that the cluster has enough resources. Stopping
>> the JobMaster for job"
>> slot.request.timeout: 50
>
> # Increase max timeout in a single attempt
>> cluster.registration.max-timeout: 30
>> # Prevent "free slot (TaskSlot)"
>> akka.ask.timeout: 10 min
>> # Prevent "Heartbeat of TaskManager timed out."
>> heartbeat.timeout: 50
>
>
> However, we acknowledge that this is only a temporary dirty fix, which is
> not what we want. It could be seen that during TaskManager registration to
> JobManager, lots of warning messages come out in logs:
>
> No hostname could be resolved for the IP address 9.166.0.118, using IP
>> address as host name. Local input split assignment (such as for HDFS files)
>> may be impacted.
>
>
> Initially we thought this was probably the cause (reverse lookup of DNS
> might take up a long time), however we later found that the reverse lookup
> only took less than 1ms, so maybe not because of this.
>
> Also, we have checked the GC log of both TaskManagers and JobManager, and
> they seem to be perfectly normal, without any signs of pauses. And the
> heartbeats are processed as normal according to the logs.
>
> Moreover, TaskManagers register quickly with ResourceManager, but then
> extra slow with TaskManager, so this is not because of a slow network
> connection.
>
> Here we wonder what could be the cause for the slow registration between
> JobManager and TaskManager(s)? No other warning or error messages in the
> log (DEBUG level) other than the "No hostname could be resolved" messages,
> which is quite weird.
>
> Thanks for the reading, and hope to get some insights into this issues : )
>
> Sincerely,
> Weike
>
>
>


Re: Additional options to S3 Filesystem: Interest?

2020-10-12 Thread Arvid Heise
Hi Padarn,

sounds like a good addition to me. We could wait for more feedback or you
could start immedately.

The next step would be to create a JIRA and get it assigned to you.

Looking forward to your contribution

Arvid

On Sun, Oct 11, 2020 at 7:45 AM Padarn Wilson  wrote:

> Hi Flink Users,
>
> We need to expose some additional options for the s3 hadoop filesystem:
> Specifically, we want to set object tagging and lifecycle. This would be a
> fairly easy change and we initially thought to create a new Filsystem with
> very minor changes to allow this.
>
> However then I wondered, would others use this? If it something that is
> worth raising as a Flink issue and then contributing back upstream.
>
> Any others who would like to be able to set object tags for the s3
> filesystem?
>
> Cheers,
> Padarn
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re:Re: 关于flink-sql Join Temporal Tables join 维表问题

2020-10-12 Thread 夜思流年梦
是这样子的,比如要统计所有员工的今天的订单数量,如果是订单表left join 员工表的话,那么今天没有订单数量的就无法出现在结果表集合中;
把员工表放在左边left join 订单表的话那么就是所有员工的今天订单数量都会 出现

















在 2020-10-12 15:17:07,"caozhen"  写道:
>
>我理解这个场景下  员工维表在右边没啥问题。
>
>join过程中需要去员工维表拿哪些字段?
>
>
>
>夜思流年梦 wrote
>> 现在有一个场景: 一个员工维表,一个订单表(监听mysql binlog的一个流表),想实时计算出所有员工的订单数;
>> 目前flink-sql 支持Join Temporal Tables  ,但是官方文档上是这么说的:仅支持带有处理时间的 temporal
>> tables 的 inner 和 left join。
>> 而这个场景必须是维表在左边,但实际情况是维表在左边无法进行left join :会报错:ClassCastException:
>> org.apache.calcite.rel.logical.LogicalProject cannot be cast to
>> org.apache.calcite.rel.core.TableScan;(如果是流表在左边,然后用temporal join
>> 一个维表那么没有问题,即:left join 维表 FOR SYSTEM_TIME AS OF proctime)
>> 想问下各位,碰到这类情况,大家是怎么处理的
>
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: [PyFlink] update udf functions on the fly

2020-10-12 Thread Arvid Heise
Hi Rinat,

Which API are you using? If you use datastream API, the common way to
simulate side inputs (which is what you need) is to use a broadcast. There
is an example on SO [1].

[1]
https://stackoverflow.com/questions/54667508/how-to-unit-test-broadcastprocessfunction-in-flink-when-processelement-depends-o

On Sat, Oct 10, 2020 at 7:12 PM Sharipov, Rinat 
wrote:

> Hi mates !
>
> I'm in the beginning of the road of building a recommendation pipeline on
> top of Flink.
> I'm going to register a list of UDF python functions on job
> startups where each UDF is an ML model.
>
> Over time new model versions appear in the ML registry and I would like to
> update my UDF functions on the fly without need to restart the whole job.
> Could you tell me, whether it's possible or not ? Maybe the community can
> give advice on how such tasks can be solved using Flink and what other
> approaches exist.
>
> Thanks a lot for your help and advice !
>
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Flink Kuberntes Libraries

2020-10-12 Thread superainbower
Hi Till,
Could u tell me how to configure HDFS as statebackend when I deploy flink on 
k8s?
I try to add the following to flink-conf.yaml


state.backend: rocksdb
state.checkpoints.dir: hdfs://slave2:8020/flink/checkpoints
state.savepoints.dir: hdfs://slave2:8020/flink/savepoints
state.backend.incremental: true


And add flink-shaded-hadoop2-2.8.3-1.8.3.jar to /opt/flink/lib


But It doesn’t work and I got this error logs


Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 'hdfs'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded. For a full list of supported file systems, please 
seehttps://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in the 
classpath, or some classes are missing from the classpath


Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.runtime.util.HadoopUtils
On 10/09/2020 22:13, Till Rohrmann wrote:
Hi Saksham,


if you want to extend the Flink Docker image you can find here more details 
[1]. 


If you want to include the library in your user jar, then you have to add the 
library as a dependency to your pom.xml file and enable the shade plugin for 
building an uber jar [2].


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html#advanced-customization
[2] 
https://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html


Cheers,
Till


On Fri, Oct 9, 2020 at 3:22 PM saksham sapra  wrote:

Thanks Till for helping out,


The way you suggested, is it possible to copy libs which is in D directory to 
FLINK_HOME/libs. I tried to run a copy command : copy D:/data/libs to 
FLINK_HOME/libs and it gets copied but i dont how can i check where it gets 
copied and this libs is taken by flink?




Thanks,
Saksham Sapra


On Wed, Oct 7, 2020 at 9:40 PM Till Rohrmann  wrote:

HI Saksham,


the easiest approach would probably be to include the required libraries in 
your user code jar which you submit to the cluster. Using maven's shade plugin 
should help with this task. Alternatively, you could also create a custom Flink 
Docker image where you add the required libraries to the FLINK_HOME/libs 
directory. This would however mean that every job you submit to the Flink 
cluster would see these libraries in the system class path.


Cheers,
Till


On Wed, Oct 7, 2020 at 2:08 PM saksham sapra  wrote:

Hi ,


i have made some configuration using this link page 
:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html.
and i am able to run flink on UI , but i need to submit a job using : 
http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:webui/proxy/#/submit
 through POstman, and i have some libraries which in local i can add in libs 
folder but in this how can i add my libraries so that it works properly.





TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-12 Thread DONG, Weike
Hi community,

Recently we have noticed a strange behavior for Flink jobs on Kubernetes
per-job mode: when the parallelism increases, the time it takes for the
TaskManagers to register with *JobManager *becomes abnormally long (for a
task with parallelism of 50, it could take 60 ~ 120 seconds or even longer
for the registration attempt), and usually more than 10 attempts are needed
to finish this registration.

Because of this, we could not submit a job requiring more than 20 slots
with the default configuration, as the TaskManager would say:


> Registration at JobManager 
> (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2)
> attempt 9 timed out after 25600 ms

Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The
> slot 60d5277e138a94fb73fc6691557001e0 has timed out.

Free slot TaskSlot(index:0, state:ALLOCATED, resource profile:
> ResourceProfile{cpuCores=1., taskHeapMemory=1.425gb
> (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb
> (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)},
> allocationId: 60d5277e138a94fb73fc6691557001e0, jobId:
> 493cd86e389ccc8f2887e1222903b5ce).
> java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed
> out.


In order to cope with this issue, we have to change the below configuration
parameters:

>
> # Prevent "Could not allocate the required slot within slot request
> timeout. Please make sure that the cluster has enough resources. Stopping
> the JobMaster for job"
> slot.request.timeout: 50

# Increase max timeout in a single attempt
> cluster.registration.max-timeout: 30
> # Prevent "free slot (TaskSlot)"
> akka.ask.timeout: 10 min
> # Prevent "Heartbeat of TaskManager timed out."
> heartbeat.timeout: 50


However, we acknowledge that this is only a temporary dirty fix, which is
not what we want. It could be seen that during TaskManager registration to
JobManager, lots of warning messages come out in logs:

No hostname could be resolved for the IP address 9.166.0.118, using IP
> address as host name. Local input split assignment (such as for HDFS files)
> may be impacted.


Initially we thought this was probably the cause (reverse lookup of DNS
might take up a long time), however we later found that the reverse lookup
only took less than 1ms, so maybe not because of this.

Also, we have checked the GC log of both TaskManagers and JobManager, and
they seem to be perfectly normal, without any signs of pauses. And the
heartbeats are processed as normal according to the logs.

Moreover, TaskManagers register quickly with ResourceManager, but then
extra slow with TaskManager, so this is not because of a slow network
connection.

Here we wonder what could be the cause for the slow registration between
JobManager and TaskManager(s)? No other warning or error messages in the
log (DEBUG level) other than the "No hostname could be resolved" messages,
which is quite weird.

Thanks for the reading, and hope to get some insights into this issues : )

Sincerely,
Weike


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-12 Thread Arvid Heise
Hi Yun,

Thank you for starting the discussion. This will solve one of the
long-standing issues [1] that confuse users. I'm also a big fan of option
3. It is also a bit closer to Chandy-Lamport again.

A couple of comments:

1) You call the tasks that get the barriers injected leaf nodes, which
would make the sinks the root nodes. That is very similar to how graphs in
relational algebra are labeled. However, I got the feeling that in Flink,
we rather iterate from sources to sink, making the sources root nodes and
the sinks the leaf nodes. However, I have no clue how it's done in similar
cases, so please take that hint cautiously.
2) I'd make the algorithm to find the subtasks iterative and react in
CheckpointCoordinator. Let's assume that we inject the barrier at all root
subtasks (initially all sources). So in the iterative algorithm, whenever
root A finishes, it looks at all connected subtasks B if they have any
upstream task left. If not B becomes a new root. That would require to only
touch a part of the job graph, but would require some callback from
JobManager to CheckpointCoordinator.
2b) We also need to be careful for out-of-sync updates: if the root is
about to finish, we could send the barrier to it from
CheckpointCoordinator, but at the time it arrives, the subtask is finished
already.
3) An implied change is that checkpoints are not aborted anymore at
EndOfPartition,
which is good, but might be explicitly added.
4) The interaction between unaligned checkpoint and EndOfPartition is a bit
ambiguous: What happens when an unaligned checkpoint is started and then
one input channel contains the EndOfPartition event? From the written
description, it sounds to me like, we move back to an aligned checkpoint
for the whole receiving task. However, that is neither easily possible nor
necessary. Imho it would be enough to also store the EndOfPartition in the
channel state.
5) I'd expand the recovery section a bit. It would be the first time that
we recover an incomplete DAG. Afaik the subtasks are deployed before the
state is recovered, so at some point, the subtasks either need to be
removed again or maybe we could even avoid them being created in the first
place.

[1] https://issues.apache.org/jira/browse/FLINK-2491

On Fri, Oct 9, 2020 at 8:22 AM Yun Gao  wrote:

> Hi, devs & users
>
> Very sorry for the spoiled formats, I resent the discussion as follows.
>
>
> As discussed in FLIP-131[1], Flink will make DataStream the unified API for 
> processing bounded and unbounded data in both streaming and blocking modes. 
> However, one long-standing problem for the streaming mode is that currently 
> Flink does not s
> ​
> upport checkpoints after some tasks finished, which causes some problems for 
> bounded or mixed jobs:
> 1.
> Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed 
> before committed to external systems in streaming mode. If sources are 
> bounded and checkpoints are disabled after some tasks are finished, the data 
> sent after the last checkpoint would always not be able to be committed. This 
> issue has already been reported some times in the user ML[2][3][4] and is 
> future brought up when working on FLIP-143: Unified Sink API [5].
> 2.
> The jobs with both bounded and unbounded sources might have to replay a large 
> amount of records after failover due to no periodic checkpoints are taken 
> after the bounded sources finished.
>
>
> Therefore, we propose to also support checkpoints after some tasks finished. 
> Your Could find more details in FLIP-147[6].
>
> Best,
> Yun
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
> [3]
> https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
> [4]
> https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
> [5]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> [6]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>
> --Original Mail --
> *Sender:*Yun Gao 
> *Send Date:*Fri Oct 9 14:16:52 2020
> *Recipients:*Flink Dev , User-Flink <
> user@flink.apache.org>
> *Subject:*[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
>> Hi, devs & users
>>
>>
>> As discussed in FLIP-131 [1], Flink will make DataStream the unified API for 
>> processing bounded and unbounded data in both streaming and blocking modes. 
>> However, one long-standing problem for the streaming mode is that currently 
>> Flink does not support checkpoints after some tasks finished, which causes 
>> some problems for bounded or mixed jobs:
>>
>> Flink exactly-once sinks rely on checkpoints to ensure data won’t be 

Re: 关于flink-sql Join Temporal Tables join 维表问题

2020-10-12 Thread Jark Wu
我理解楼主的场景不是 temporal join 的场景,而是双流 join
的场景,因为任何一条流的变化,都希望触发对结果的更新,所以讲员工作为右边维度表是不行的。

如果是我理解的这样的话,你可以用 flink-cdc-connectors [1] 去对接员工和订单两个 binlog 流,然后直接
join,然后聚合订单数。伪代码如下:

create table users (
  user_id bigint,
  ...
) with (
  connector = mysql-cdc
  ...
);

create table orders (
  order_id bigint,
  user_id bigint,
  ...
) with (
  connector = mysql-cdc
  ...
);

select user_id, count(*) as order_num
from (select * from users left join orders on users.user_id =
orders.user_id)
group by user_id;


[1]: https://github.com/ververica/flink-cdc-connectors

On Mon, 12 Oct 2020 at 15:17, caozhen  wrote:

>
> 我理解这个场景下  员工维表在右边没啥问题。
>
> join过程中需要去员工维表拿哪些字段?
>
>
>
> 夜思流年梦 wrote
> > 现在有一个场景: 一个员工维表,一个订单表(监听mysql binlog的一个流表),想实时计算出所有员工的订单数;
> > 目前flink-sql 支持Join Temporal Tables  ,但是官方文档上是这么说的:仅支持带有处理时间的 temporal
> > tables 的 inner 和 left join。
> > 而这个场景必须是维表在左边,但实际情况是维表在左边无法进行left join :会报错:ClassCastException:
> > org.apache.calcite.rel.logical.LogicalProject cannot be cast to
> > org.apache.calcite.rel.core.TableScan;(如果是流表在左边,然后用temporal join
> > 一个维表那么没有问题,即:left join 维表 FOR SYSTEM_TIME AS OF proctime)
> > 想问下各位,碰到这类情况,大家是怎么处理的
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql client on yarn 问题

2020-10-12 Thread caozhen
看错误提示是没权限读取core-site.xml,有没有检查core-site.xml的文件权限



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于flink-sql Join Temporal Tables join 维表问题

2020-10-12 Thread caozhen

我理解这个场景下  员工维表在右边没啥问题。

join过程中需要去员工维表拿哪些字段?



夜思流年梦 wrote
> 现在有一个场景: 一个员工维表,一个订单表(监听mysql binlog的一个流表),想实时计算出所有员工的订单数;
> 目前flink-sql 支持Join Temporal Tables  ,但是官方文档上是这么说的:仅支持带有处理时间的 temporal
> tables 的 inner 和 left join。
> 而这个场景必须是维表在左边,但实际情况是维表在左边无法进行left join :会报错:ClassCastException:
> org.apache.calcite.rel.logical.LogicalProject cannot be cast to
> org.apache.calcite.rel.core.TableScan;(如果是流表在左边,然后用temporal join
> 一个维表那么没有问题,即:left join 维表 FOR SYSTEM_TIME AS OF proctime)
> 想问下各位,碰到这类情况,大家是怎么处理的





--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: 回复: Demo:从埋点日志中,统计实时的 PV 遇到的问题

2020-10-12 Thread Natasha
hi leiyanrui,
我明白了,非常感谢你!!!


在2020年10月12日 15:05,leiyanrui<1150693...@qq.com> 写道:
进一步KeyedProcessFunction的处理是按照window的end时间,这样就会只有一个key进而将聚合后的数据再次聚合



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复: Demo:从埋点日志中,统计实时的 PV 遇到的问题

2020-10-12 Thread leiyanrui
进一步KeyedProcessFunction的处理是按照window的end时间,这样就会只有一个key进而将聚合后的数据再次聚合



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: TM heartbeat timeout due to ResourceManager being busy

2020-10-12 Thread Xintong Song
No worries :)


Thank you~

Xintong Song



On Mon, Oct 12, 2020 at 2:48 PM Paul Lam  wrote:

> Sorry for the misspelled name, Xintong
>
> Best,
> Paul Lam
>
> 2020年10月12日 14:46,Paul Lam  写道:
>
> Hi Xingtong,
>
> Thanks a lot for the pointer!
>
> It’s good to see there would be a new IO executor to take care of the TM
> contexts. Looking forward to the 1.12 release!
>
> Best,
> Paul Lam
>
> 2020年10月12日 14:18,Xintong Song  写道:
>
> Hi Paul,
>
> Thanks for reporting this.
>
> Indeed, Flink's RM currently performs several HDFS operations in the rpc
> main thread when preparing the TM context, which may block the main thread
> when HDFS is slow.
>
> Unfortunately, I don't see any out-of-box approach that fixes the problem
> at the moment, except for increasing the heartbeat timeout.
>
> As for the long run solution, I think there's an easier approach. We can
> move creating of the TM contexts away from the rpc main thread. Ideally, we
> should try to avoid performing any heavy operations which do not modify the
> RM's internal states in the rpc main thread. With FLINK-19241, this can be
> achieved easily by delegating the work to the io executor.
>
> Thank you~
> Xintong Song
>
>
>
> On Mon, Oct 12, 2020 at 12:44 PM Paul Lam  wrote:
>
>> Hi,
>>
>> After FLINK-13184 is implemented (even with Flink 1.11), occasionally
>> there would still be jobs
>> with high parallelism getting TM-RM heartbeat timeouts when RM is busy
>> creating TM contexts
>> on cluster initialization and HDFS is slow at that moment.
>>
>> Apart from increasing the TM heartbeat timeout, is there any recommended
>>  out of the box
>> approach that can reduce the chance of getting the timeouts?
>>
>> In the long run, is it possible to limit the number of taskmanager
>> contexts that RM creates at
>> a time, so that the heartbeat triggers can chime in?
>>
>> Thanks!
>>
>> Best,
>> Paul Lam
>>
>
>
>


回复: Demo:从埋点日志中,统计实时的 PV 遇到的问题

2020-10-12 Thread Natasha
HI leiyanrui,
你说的没有错,我改后的代码确实是对应了每种behavior的pv[捂脸],非常谢谢你的热心解答!
出于对Flink的理解不太熟悉,我还想再请教一个问题,为什么原demo还需要再对聚合后的窗口数据再做一次KeyedProcessFunction的处理?(图1)因为我想在聚合函数这一步就已经可以拿到整个窗口的pv数据了。
在2020年10月12日 14:36,leiyanrui<1150693...@qq.com> 写道:
keyby(_behavior) 你看到的四个结果应该是每种behavior的pv 不是整个窗口的pv



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: why we need keyed state and operate state when we already have checkpoint?

2020-10-12 Thread Arvid Heise
Hi 大森林,

You can always resume from checkpoints independent of the usage of keyed or
non-keyed state of operators.
1 checkpoint contains the state of all operators at a given point in time.
Each operator may have keyed state, raw state, or non-keyed state.
As long as you are not changing the operators (too much) before restarting,
you can always restart.

During (automatic) restart of a Flink application, the state of a given
checkpoint is restored to the operators, such that it looks like the
operator never failed. However, the operators are reset to the time of the
respective checkpoint.

I have no clue what you mean with "previous variable temporary result".

On Wed, Oct 7, 2020 at 9:13 AM 大森林  wrote:

> Thanks for your replies,I have some understandings.
>
> There are two cases.
> 1. if I use no keyed state in program,when it's killed,I can only resume
> from previous result
> 1. if I use  keyed state in program,when it's killed,I can
>  resume from previous result and previous variable temporary result.
>
> Am I right?
> Thanks for your guide.
>
>
> -- 原始邮件 --
> *发件人:* "Arvid Heise" ;
> *发送时间:* 2020年10月7日(星期三) 下午2:25
> *收件人:* "大森林";
> *抄送:* "Shengkai Fang";"user";
> *主题:* Re: why we need keyed state and operate state when we already have
> checkpoint?
>
> I think there is some misunderstanding here: a checkpoint IS (a snapshot
> of) the keyed state and operator state (among a few more things). [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#definitions
>
> On Wed, Oct 7, 2020 at 6:51 AM 大森林  wrote:
>
>> when the job is killed,state is also misssing.
>> so why we need keyed state?Is keyed state useful when we try to resuming
>> the killed job?
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Shengkai Fang" ;
>> *发送时间:* 2020年10月7日(星期三) 中午12:43
>> *收件人:* "大森林";
>> *抄送:* "user";
>> *主题:* Re: why we need keyed state and operate state when we already have
>> checkpoint?
>>
>> The checkpoint is a snapshot for the job and we can resume the job if the
>> job is killed unexpectedly. The state is another thing to memorize the
>> intermediate result of calculation. I don't think the checkpoint can
>> replace state.
>>
>> 大森林  于2020年10月7日周三 下午12:26写道:
>>
>>> Could you tell me:
>>>
>>> why we need keyed state and operator state when we already have
>>> checkpoint?
>>>
>>> when a running jar crash,we can resume from the checkpoint
>>> automatically/manually.
>>> So why did we still need keyed state and operator state.
>>>
>>> Thanks
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: TM heartbeat timeout due to ResourceManager being busy

2020-10-12 Thread Paul Lam
Sorry for the misspelled name, Xintong

Best,
Paul Lam

> 2020年10月12日 14:46,Paul Lam  写道:
> 
> Hi Xingtong,
> 
> Thanks a lot for the pointer!
> 
> It’s good to see there would be a new IO executor to take care of the TM 
> contexts. Looking forward to the 1.12 release!
> 
> Best,
> Paul Lam
> 
>> 2020年10月12日 14:18,Xintong Song > > 写道:
>> 
>> Hi Paul,
>> 
>> Thanks for reporting this.
>> 
>> Indeed, Flink's RM currently performs several HDFS operations in the rpc 
>> main thread when preparing the TM context, which may block the main thread 
>> when HDFS is slow.
>> 
>> Unfortunately, I don't see any out-of-box approach that fixes the problem at 
>> the moment, except for increasing the heartbeat timeout.
>> 
>> As for the long run solution, I think there's an easier approach. We can 
>> move creating of the TM contexts away from the rpc main thread. Ideally, we 
>> should try to avoid performing any heavy operations which do not modify the 
>> RM's internal states in the rpc main thread. With FLINK-19241, this can be 
>> achieved easily by delegating the work to the io executor.
>> 
>> Thank you~
>> Xintong Song
>> 
>> 
>> On Mon, Oct 12, 2020 at 12:44 PM Paul Lam > > wrote:
>> Hi,
>> 
>> After FLINK-13184 is implemented (even with Flink 1.11), occasionally there 
>> would still be jobs 
>> with high parallelism getting TM-RM heartbeat timeouts when RM is busy 
>> creating TM contexts 
>> on cluster initialization and HDFS is slow at that moment. 
>> 
>> Apart from increasing the TM heartbeat timeout, is there any recommended  
>> out of the box 
>> approach that can reduce the chance of getting the timeouts? 
>> 
>> In the long run, is it possible to limit the number of taskmanager contexts 
>> that RM creates at 
>> a time, so that the heartbeat triggers can chime in? 
>> 
>> Thanks!
>> 
>> Best,
>> Paul Lam
> 



Re: TM heartbeat timeout due to ResourceManager being busy

2020-10-12 Thread Paul Lam
Hi Xingtong,

Thanks a lot for the pointer!

It’s good to see there would be a new IO executor to take care of the TM 
contexts. Looking forward to the 1.12 release!

Best,
Paul Lam

> 2020年10月12日 14:18,Xintong Song  写道:
> 
> Hi Paul,
> 
> Thanks for reporting this.
> 
> Indeed, Flink's RM currently performs several HDFS operations in the rpc main 
> thread when preparing the TM context, which may block the main thread when 
> HDFS is slow.
> 
> Unfortunately, I don't see any out-of-box approach that fixes the problem at 
> the moment, except for increasing the heartbeat timeout.
> 
> As for the long run solution, I think there's an easier approach. We can move 
> creating of the TM contexts away from the rpc main thread. Ideally, we should 
> try to avoid performing any heavy operations which do not modify the RM's 
> internal states in the rpc main thread. With FLINK-19241, this can be 
> achieved easily by delegating the work to the io executor.
> 
> Thank you~
> Xintong Song
> 
> 
> On Mon, Oct 12, 2020 at 12:44 PM Paul Lam  > wrote:
> Hi,
> 
> After FLINK-13184 is implemented (even with Flink 1.11), occasionally there 
> would still be jobs 
> with high parallelism getting TM-RM heartbeat timeouts when RM is busy 
> creating TM contexts 
> on cluster initialization and HDFS is slow at that moment. 
> 
> Apart from increasing the TM heartbeat timeout, is there any recommended  out 
> of the box 
> approach that can reduce the chance of getting the timeouts? 
> 
> In the long run, is it possible to limit the number of taskmanager contexts 
> that RM creates at 
> a time, so that the heartbeat triggers can chime in? 
> 
> Thanks!
> 
> Best,
> Paul Lam



Re: [PyFlink] register udf functions with different versions of the same library in the same job

2020-10-12 Thread Sharipov, Rinat
Hi Xingbo ! Thx a lot for such a detailed reply, it is very useful.

пн, 12 окт. 2020 г. в 09:32, Xingbo Huang :

> Hi,
> I will do my best to provide pyflink related content, I hope it helps you.
>
> >>>  each udf function is a separate process, that is managed by Beam (but
> I'm not sure I got it right).
>
> Strictly speaking, it is not true that every UDF is in a different python
> process. For example, the two python functions of udf1 and udf2 such as
> udf1(udf2(a)) are running in a python process, and you can even think that
> there is a return value of python wrap func udf1(udf2(a)). In fact, you can
> think that in most of the cases, we will put multiple python udf together
> to improve its performance.
>
> >>> Does it mean that I can register multiple udf functions with different
> versions of the same library or what would be even better with different
> python environments and they won't clash
>
> A PyFlink job All nodes use the same python environment path currently. So
> there is no way to make each UDF use a different python execution
> environment. Maybe you need to use multiple jobs to achieve this effect.
>
> Best,
> Xingbo
>
> Sharipov, Rinat  于2020年10月10日周六 上午1:18写道:
>
>> Hi mates !
>>
>> I've just read an amazing article
>> 
>> about PyFlink and I'm absolutely delighted.
>> I got some questions about udf registration, and it seems that it's
>> possible to specify the list of libraries that should be used to evaluate
>> udf functions.
>>
>> As far as I understand, each udf function is a separate process, that is
>> managed by Beam (but I'm not sure I got it right).
>> Does it mean that I can register multiple udf functions with different
>> versions of the same library or what would be even better with different
>> python environments and they won't clash ?
>>
>> A few words about the task that I'm trying to solve: I would like to
>> build a recommendation pipeline that will accumulate features as a table
>> and make
>> recommendations using models from Ml flow registry. Since I don't want to
>> limit data analysts from usage in all libraries that they won't, the best
>> solution
>> for me - assemble the environment using conda descriptor and register a
>> UDF function.
>>
>> Kubernetes and Kubeflow are not an option for us yet, so we are trying to
>> include models into existing pipelines.
>>
>> thx !
>>
>>
>>
>>
>>


Re: ConnectionPool to DB and parallelism of operator question

2020-10-12 Thread Arvid Heise
Hi Vijay,

If you implement the SinkFunction yourself, you can share the
OkHttpClient.Builder across all instances in the same taskmanager by using
a static field and initializing it only once (ideally in
RichSinkFunction#open).

On Tue, Oct 6, 2020 at 9:37 AM Aljoscha Krettek  wrote:

> Hi,
>
> since I don't know the implementation of the Sink I can only guess. I
> would say you get 82 * 300 connections because you will get 82 instances
> of a sink operator and each of those would then have a connection pool
> of 300 connections. The individual sink instances will (potentially) run
> on different machines and not share the connection pool.
>
> Best,
> Aljoscha
>
> On 05.10.20 22:28, Vijay Balakrishnan wrote:
> > HI,
> > Basic question on parallelism of operators and ConnectionPool to DB:
> > Will this result in 82 * 300 connections to InfluxDB or just 300
> > connections to InfluxDB ?
> > main() {
> >sink = createInfluxMonitoringSink(..);
> >keyStream.addSink(sink).addParallelism(82);//will this result in 82 *
> 300
> > connections to InfluxDB or just 300 connections to InfluxDB ?
> > }
> >
> >
> > private . createInfluxMonitoringSink(...) {
> >
> >
> >final OkHttpClient.Builder okHttpClientBuilder = new
> > OkHttpClient.Builder()
> > .readTimeout(timeout, TimeUnit.MILLISECONDS)
> > .connectTimeout(timeout, TimeUnit.MILLISECONDS)
> > .writeTimeout(timeout, TimeUnit.MILLISECONDS)
> > .connectionPool(new ConnectionPool(300, 60,
> > TimeUnit.SECONDS));
> >
> > try (InfluxDB influxDB = InfluxDBFactory.connect
> > (host, userName, pwd, okHttpClientBuilder)) { ..}
> >
> > }
> >
> > TIA,
> >
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: TM heartbeat timeout due to ResourceManager being busy

2020-10-12 Thread Xintong Song
FYI, I just created FLINK-19568 for tracking this issue.


Thank you~

Xintong Song


[1] https://issues.apache.org/jira/browse/FLINK-19568

On Mon, Oct 12, 2020 at 2:18 PM Xintong Song  wrote:

> Hi Paul,
>
> Thanks for reporting this.
>
> Indeed, Flink's RM currently performs several HDFS operations in the rpc
> main thread when preparing the TM context, which may block the main thread
> when HDFS is slow.
>
> Unfortunately, I don't see any out-of-box approach that fixes the problem
> at the moment, except for increasing the heartbeat timeout.
>
> As for the long run solution, I think there's an easier approach. We can
> move creating of the TM contexts away from the rpc main thread. Ideally, we
> should try to avoid performing any heavy operations which do not modify the
> RM's internal states in the rpc main thread. With FLINK-19241, this can be
> achieved easily by delegating the work to the io executor.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Oct 12, 2020 at 12:44 PM Paul Lam  wrote:
>
>> Hi,
>>
>> After FLINK-13184 is implemented (even with Flink 1.11), occasionally
>> there would still be jobs
>> with high parallelism getting TM-RM heartbeat timeouts when RM is busy
>> creating TM contexts
>> on cluster initialization and HDFS is slow at that moment.
>>
>> Apart from increasing the TM heartbeat timeout, is there any recommended
>>  out of the box
>> approach that can reduce the chance of getting the timeouts?
>>
>> In the long run, is it possible to limit the number of taskmanager
>> contexts that RM creates at
>> a time, so that the heartbeat triggers can chime in?
>>
>> Thanks!
>>
>> Best,
>> Paul Lam
>>
>


Re: Demo:从埋点日志中,统计实时的 PV 遇到的问题

2020-10-12 Thread leiyanrui
keyby(_behavior) 你看到的四个结果应该是每种behavior的pv 不是整个窗口的pv



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink checkpoint timeout

2020-10-12 Thread Arvid Heise
Hi Omkar,

I don't see anything suspicious in regards to how Flink handles
checkpointing; it simply took longer than 10m (configured checkpointing
timeout) to checkpoint.

The usual reason for long checkpointing times is backpressure. And indeed
looking at your thread dump, I see that you have a sleep Fn in it. Can you
shed some light on this? Why do you need it? If you want to throttle
things, it's best to throttle at the source if possible. Alternatively,
have the sleep as early as possible, so that it's ideally directly chained
to the source. That would reduce the number of records in network buffers
significantly, which speeds up checkpointing tremendously. Lastly, you
might want to reduce the number of network buffers if you indeed have
backpressure (check Web UI for that).

On Tue, Oct 6, 2020 at 6:04 AM Yu Li  wrote:

> I'm not 100% sure but from the given information this might be related to
> FLINK-14498 [1] and partially relieved by FLINK-16645 [2].
>
> @Omkar Could you try the 1.11.0 release out and see whether the issue
> disappeared?
>
> @zhijiang  @yingjie could you also take a
> look here? Thanks.
>
> Best Regards,
> Yu
>
> [1] https://issues.apache.org/jira/browse/FLINK-14498
> [2] https://issues.apache.org/jira/browse/FLINK-16645
>
>
> On Fri, 18 Sep 2020 at 09:28, Deshpande, Omkar 
> wrote:
>
>> These are the hostspot method. Any pointers on debugging this? The
>> checkpoints keep timing out since migrating to 1.10 from 1.9
>> --
>> *From:* Deshpande, Omkar 
>> *Sent:* Wednesday, September 16, 2020 5:27 PM
>> *To:* Congxian Qiu 
>> *Cc:* user@flink.apache.org ; Yun Tang <
>> myas...@live.com>
>> *Subject:* Re: flink checkpoint timeout
>>
>> This email is from an external sender.
>>
>> This thread seems to stuck in awaiting notification state -
>> at sun.misc.Unsafe.park(Native Method)
>> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>> at
>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>> at
>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>> at
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
>>
>> --
>> *From:* Congxian Qiu 
>> *Sent:* Monday, September 14, 2020 10:57 PM
>> *To:* Deshpande, Omkar 
>> *Cc:* user@flink.apache.org 
>> *Subject:* Re: flink checkpoint timeout
>>
>> This email is from an external sender.
>>
>> Hi
>> You can try to find out is there is some hot method, or the snapshot
>> stack is waiting for some lock. and maybe
>> Best,
>> Congxian
>>
>>
>> Deshpande, Omkar  于2020年9月15日周二 下午12:30写道:
>>
>> Few of the subtasks fail. I cannot upgrade to 1.11 yet. But I can still
>> get the thread dump. What should I be looking for in the thread dump?
>>
>> --
>> *From:* Yun Tang 
>> *Sent:* Monday, September 14, 2020 8:52 PM
>> *To:* Deshpande, Omkar ;
>> user@flink.apache.org 
>> *Subject:* Re: flink checkpoint timeout
>>
>> This email is from an external sender.
>>
>> Hi Omkar
>>
>> First of all, you should check the web UI of checkpoint [1] to see
>> whether many subtasks fail to complete in time or just few of them. The
>> former one might be your checkpoint time out is not enough for current
>> case. The later one might be some task stuck in slow machine or cannot grab
>> checkpoint lock to process sync phase of checkpointing, you can use thread
>> dump [2] (needs to bump to Flink-1.11) or jstack to see what happened in
>> java process.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/checkpoint_monitoring.html
>> [2] https://issues.apache.org/jira/browse/FLINK-14816
>>
>> Best
>> Yun Tang
>> --
>> *From:* Deshpande, Omkar 
>> *Sent:* Tuesday, September 15, 2020 10:25
>> *To:* user@flink.apache.org 
>> *Subject:* Re: flink checkpoint timeout
>>
>> I have followed this
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>> 
>> and I am using taskmanager.memory.flink.size now instead of
>> taskmanager.heap.size
>> --
>> *From:* Deshpande, Omkar 
>> *Sent:* Monday, September 14, 2020 6:23 PM
>> *To:* user@flink.apache.org 
>> *Subject:* flink checkpoint timeout
>>
>> This email is from an external sender.
>>
>> Hello,
>>
>> I recently upgraded from flink 1.9 to 1.10. The checkpointing succeeds
>> first couple of times and then starts failing because of timeouts. The
>> checkpoint time grows with every checkpoint and starts exceeding 10
>> minutes. I do not see any exceptions in the logs. I have enabled debug
>> logging at 

Re: [PyFlink] register udf functions with different versions of the same library in the same job

2020-10-12 Thread Xingbo Huang
Hi,
I will do my best to provide pyflink related content, I hope it helps you.

>>>  each udf function is a separate process, that is managed by Beam (but
I'm not sure I got it right).

Strictly speaking, it is not true that every UDF is in a different python
process. For example, the two python functions of udf1 and udf2 such as
udf1(udf2(a)) are running in a python process, and you can even think that
there is a return value of python wrap func udf1(udf2(a)). In fact, you can
think that in most of the cases, we will put multiple python udf together
to improve its performance.

>>> Does it mean that I can register multiple udf functions with different
versions of the same library or what would be even better with different
python environments and they won't clash

A PyFlink job All nodes use the same python environment path currently. So
there is no way to make each UDF use a different python execution
environment. Maybe you need to use multiple jobs to achieve this effect.

Best,
Xingbo

Sharipov, Rinat  于2020年10月10日周六 上午1:18写道:

> Hi mates !
>
> I've just read an amazing article
> 
> about PyFlink and I'm absolutely delighted.
> I got some questions about udf registration, and it seems that it's
> possible to specify the list of libraries that should be used to evaluate
> udf functions.
>
> As far as I understand, each udf function is a separate process, that is
> managed by Beam (but I'm not sure I got it right).
> Does it mean that I can register multiple udf functions with different
> versions of the same library or what would be even better with different
> python environments and they won't clash ?
>
> A few words about the task that I'm trying to solve: I would like to build
> a recommendation pipeline that will accumulate features as a table and make
> recommendations using models from Ml flow registry. Since I don't want to
> limit data analysts from usage in all libraries that they won't, the best
> solution
> for me - assemble the environment using conda descriptor and register a
> UDF function.
>
> Kubernetes and Kubeflow are not an option for us yet, so we are trying to
> include models into existing pipelines.
>
> thx !
>
>
>
>
>


回复: Demo:从埋点日志中,统计实时的 PV 遇到的问题

2020-10-12 Thread Natasha
HI Sysuke,








在2020年10月12日 14:14,Lee Sysuke 写道:
Hi , 可以粘一下aggregateFuntion和ProcessFunction的代码吗


Natasha <13631230...@163.com> 于2020年10月12日周一 下午2:11写道:



HI ALL,
刚入门Flink的我最近从github上找了几个分析用户行为的Demo,想以此来入门Flink。
   1. 但是有一个问题我一直想不通:(图1)
  
  
如图,设置用户访问时间为EventTime;我设想的是,如果我设置一小时的滚动时间,那么按道理我应该得到的结果是,在这一小时内访问的pv总数都应该返回给我,但是为什么console打出来的却是四个相同的时间戳但是却是四个分散的41890,992,1474,2539的pv数量?(图2)
 
2. 
原来的demo的写法是可以正确打印出理想中的结果(图3),但是我仍旧无法理解:为什么我设置了一小时滚动时间,按道理聚合函数就应该把这一小时内的pv累计好返回给我,为什么会出现四个一样的时间戳,带着不同pv数;我想四个pv数累加在一起就应该是这个时间戳内pv总数;可能我还不能正确理解Flink的代码。。。请各位帮帮忙,可以解答一下我的疑惑吗~

Re: TM heartbeat timeout due to ResourceManager being busy

2020-10-12 Thread Xintong Song
Hi Paul,

Thanks for reporting this.

Indeed, Flink's RM currently performs several HDFS operations in the rpc
main thread when preparing the TM context, which may block the main thread
when HDFS is slow.

Unfortunately, I don't see any out-of-box approach that fixes the problem
at the moment, except for increasing the heartbeat timeout.

As for the long run solution, I think there's an easier approach. We can
move creating of the TM contexts away from the rpc main thread. Ideally, we
should try to avoid performing any heavy operations which do not modify the
RM's internal states in the rpc main thread. With FLINK-19241, this can be
achieved easily by delegating the work to the io executor.

Thank you~

Xintong Song



On Mon, Oct 12, 2020 at 12:44 PM Paul Lam  wrote:

> Hi,
>
> After FLINK-13184 is implemented (even with Flink 1.11), occasionally
> there would still be jobs
> with high parallelism getting TM-RM heartbeat timeouts when RM is busy
> creating TM contexts
> on cluster initialization and HDFS is slow at that moment.
>
> Apart from increasing the TM heartbeat timeout, is there any recommended
>  out of the box
> approach that can reduce the chance of getting the timeouts?
>
> In the long run, is it possible to limit the number of taskmanager
> contexts that RM creates at
> a time, so that the heartbeat triggers can chime in?
>
> Thanks!
>
> Best,
> Paul Lam
>


Re: Demo:从埋点日志中,统计实时的 PV 遇到的问题

2020-10-12 Thread Lee Sysuke
Hi , 可以粘一下aggregateFuntion和ProcessFunction的代码吗

Natasha <13631230...@163.com> 于2020年10月12日周一 下午2:11写道:

>
> HI ALL,
> 刚入门Flink的我最近从github上找了几个分析用户行为的Demo,想以此来入门Flink。
>1. 但是有一个问题我一直想不通:(图1)
>
>   如图,设置用户访问时间为EventTime;我设想的是,如果我设置一小时的滚动时间,那么按道理我应该得到的结果是,
> *在这一小时内访问的pv总数都应该返回给我*
> ,但是为什么console打出来的却是四个相同的时间戳但是却是四个分散的41890,992,1474,2539的pv数量?(图2)
>
> 2.
> 原来的demo的写法是可以正确打印出理想中的结果(图3),但是我仍旧无法理解:为什么我设置了一小时滚动时间,按道理聚合函数就应该把这一小时内的pv累计好返回给我,为什么会出现四个一样的时间戳,带着不同pv数;我想四个pv数累加在一起就应该是这个时间戳内pv总数;可能我还不能正确理解Flink的代码。。。请各位帮帮忙,可以解答一下我的疑惑吗~
>


Demo:从埋点日志中,统计实时的 PV 遇到的问题

2020-10-12 Thread Natasha


HI ALL,
刚入门Flink的我最近从github上找了几个分析用户行为的Demo,想以此来入门Flink。
   1. 但是有一个问题我一直想不通:(图1)
  
  
如图,设置用户访问时间为EventTime;我设想的是,如果我设置一小时的滚动时间,那么按道理我应该得到的结果是,在这一小时内访问的pv总数都应该返回给我,但是为什么console打出来的却是四个相同的时间戳但是却是四个分散的41890,992,1474,2539的pv数量?(图2)
 
2. 
原来的demo的写法是可以正确打印出理想中的结果(图3),但是我仍旧无法理解:为什么我设置了一小时滚动时间,按道理聚合函数就应该把这一小时内的pv累计好返回给我,为什么会出现四个一样的时间戳,带着不同pv数;我想四个pv数累加在一起就应该是这个时间戳内pv总数;可能我还不能正确理解Flink的代码。。。请各位帮帮忙,可以解答一下我的疑惑吗~