Re: FlinkSQL大窗口小步长的滑动窗口解决方案

2023-05-28 Thread Shammon FY
Hi,

这是窗口触发后发送的数据量过大吗?调大资源,加大窗口计算的并发度是否可以缓解这个问题?

Best,
Shammon FY

On Fri, May 26, 2023 at 2:03 PM tanjialiang  wrote:

> Hi, all.
> 我在使用FlinkSQL的window tvf滑动窗口时遇到一些问题。
> 滑动步长为5分钟,窗口为24小时,group by
> user_id的滑动窗口,当任务挂掉了或者从kafka的earliest-offset消费,checkpoint很难成功。
> 因为从earliest开始消费,数据很快就会堆满缓冲区产生背压,这时这一批数据可能会触发N次窗口计算往下游发,每次触发的操作成本是(用户基数 *
> 24 * 60 / 5),checkpoint barrier可能会一直卡住。
> 这时候有什么办法可以破局吗?
>
>
> best,
> tanjialiang.


Flink使用精准一次写入kafka报错

2023-05-28 Thread lxk
项目中使用精准一次语义写入kafka,代码和配置如下:
Properties producerProperties = MyKafkaUtil.getProducerProperties();
KafkaSink kafkaSink = KafkaSink.builder()
.setBootstrapServers(Event2Kafka.parameterTool.get("bootstrap.server"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(Event2Kafka.parameterTool.get("feature.topic.name"))
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setKafkaProducerConfig(producerProperties)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("streamx_flow_1261")
.build();

eventJsonStream.sinkTo(kafkaSink).setParallelism(14)
.name("event2kafka").uid("kafkasink");
public static Properties getProducerProperties(){
Properties kafkaProducerProps = new Properties();
kafkaProducerProps.setProperty("bootstrap.servers", 
parameterTool.get(bootstrap.server"));
kafkaProducerProps.setProperty("auto.commit.interval.ms", "5000");
kafkaProducerProps.setProperty("auto.offset.reset", "latest");
kafkaProducerProps.setProperty("session.timeout.ms", "5000");
kafkaProducerProps.setProperty("transaction.timeout.ms",12*6 +"");
kafkaProducerProps.put("security.protocol", "SASL_PLAINTEXT");
kafkaProducerProps.put("sasl.kerberos.service.name","kafka");

return kafkaProducerProps;
}

Flink使用精准一次写入kafka报错

2023-05-28 Thread lxk
上封邮件发错了,重新发一下。项目中使用精准一次语义写入kafka,代码和配置如下:

写入代码如下:
Properties producerProperties = MyKafkaUtil.getProducerProperties();
KafkaSink kafkaSink = KafkaSink.builder()
.setBootstrapServers(Event2Kafka.parameterTool.get("bootstrap.server"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(Event2Kafka.parameterTool.get("feature.topic.name"))
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setKafkaProducerConfig(producerProperties)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("streamx_flow_1261")
.build();

eventJsonStream.sinkTo(kafkaSink).setParallelism(14)

.name("event2kafka").uid("kafkasink");




kafka配置如下:
public static Properties getProducerProperties(){
Properties kafkaProducerProps = new Properties();
kafkaProducerProps.setProperty("bootstrap.servers", 
parameterTool.get(bootstrap.server"));
kafkaProducerProps.setProperty("auto.commit.interval.ms", "5000");
kafkaProducerProps.setProperty("auto.offset.reset", "latest");
kafkaProducerProps.setProperty("session.timeout.ms", "5000");
kafkaProducerProps.setProperty("transaction.timeout.ms",12*6 +"");
kafkaProducerProps.put("security.protocol", "SASL_PLAINTEXT");
kafkaProducerProps.put("sasl.kerberos.service.name","kafka");

return kafkaProducerProps;

}


项目运行很久都没啥问题,最近突然报了以下的错误
org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka 
-topic-2@-1 with 
FlinkKafkaInternalProducer{transactionalId='streamx_flow_1261-8-5', 
inTransaction=true, closed=false} 
at 
org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:436)
at 
org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:417)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: The 
broker received an out of order sequence number.


参考了stackoverflow上面的回答:https://stackoverflow.com/questions/55192852/transactional-producer-vs-just-idempotent-producer-java-exception-outoforderseq
但是里面涉及到的参数我都没有设置,都是使用默认的配置。照理来说应该不会有这样的问题。想请问下各位有没有什么看法。还是我的配置有啥错误和缺少的地方。



回复: FlinkSQL大窗口小步长的滑动窗口解决方案

2023-05-28 Thread tanjialiang
Hi, Shammon FY.

理论上提高并行度是可以缓解,但是并行度调整太大对成本要求可能会比较高。因为写入其实不需要占用太多的资源,只是窗口触发后数据量过大(用户的基数),每条数据合并的操作成本过高(一条数据的窗口聚合成本需要合并24*60/5=288次)。


现在我只能想到以下几种解决办法
1. 将窗口步长往上调,这个问题可以从根本上解决(步长过长意味着窗口触发的时间会延后)
2. 步长可以往上调,使用early-fire机制,将未计算完成的窗口直接下发(触发的数据可能不符合近24小时的业务含义,下游系统需要支持upsert)
3. 借助外部存储,flink直接同步或者预聚合的方式写入一个OLAP系统(譬如doris/ck),读时再聚合(需要一个稳定可靠的外部存储)


你这边用flink做滑动窗口的计算会遇到这样的问题吗?是否还有其他更好解决办法?
十分期待你的反馈
best, 
tanjialiang.
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年5月29日 09:08 |
| 收件人 |  |
| 主题 | Re: FlinkSQL大窗口小步长的滑动窗口解决方案 |
Hi,

这是窗口触发后发送的数据量过大吗?调大资源,加大窗口计算的并发度是否可以缓解这个问题?

Best,
Shammon FY

On Fri, May 26, 2023 at 2:03 PM tanjialiang  wrote:

Hi, all.
我在使用FlinkSQL的window tvf滑动窗口时遇到一些问题。
滑动步长为5分钟,窗口为24小时,group by
user_id的滑动窗口,当任务挂掉了或者从kafka的earliest-offset消费,checkpoint很难成功。
因为从earliest开始消费,数据很快就会堆满缓冲区产生背压,这时这一批数据可能会触发N次窗口计算往下游发,每次触发的操作成本是(用户基数 *
24 * 60 / 5),checkpoint barrier可能会一直卡住。
这时候有什么办法可以破局吗?


best,
tanjialiang.


Re: [ANNOUNCE] Apache Flink 1.16.2 released

2023-05-28 Thread weijie guo
Hi Jing,

Thank you for caring about the releasing process. It has to be said that
the entire process went smoothly. We have very comprehensive
documentation[1] to guide my work, thanks to the contribution of previous
release managers and the community.

Regarding the obstacles, I actually only have one minor problem: We used an
older twine(1.12.0) to deploy python artifacts to PyPI, and its compatible
dependencies (such as urllib3) are also older. When I tried twine upload,
the process couldn't work as expected as the version of urllib3 installed
in my machine was relatively higher. In order to solve this, I had to
proactively downgrade the version of some dependencies. I added a notice in
the cwiki page[1] to prevent future release managers from encountering the
same problem. It seems that this is a known issue(see comments in [2]),
which has been resolved in the higher version of twine, I wonder if we can
upgrade the version of twine? Does anyone remember the reason why we fixed
a very old version(1.12.0)?

Best regards,

Weijie


[1]
https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release

[2] https://github.com/pypa/twine/issues/997


Jing Ge  于2023年5月27日周六 00:15写道:

> Hi Weijie,
>
> Thanks again for your effort. I was wondering if there were any obstacles
> you had to overcome while releasing 1.16.2 and 1.17.1 that could lead us to
> any improvement wrt the release process and management?
>
> Best regards,
> Jing
>
> On Fri, May 26, 2023 at 4:41 PM Martijn Visser 
> wrote:
>
> > Thank you Weijie and those who helped with testing!
> >
> > On Fri, May 26, 2023 at 1:06 PM weijie guo 
> > wrote:
> >
> > > The Apache Flink community is very happy to announce the release of
> > > Apache Flink 1.16.2, which is the second bugfix release for the Apache
> > > Flink 1.16 series.
> > >
> > >
> > >
> > > Apache Flink® is an open-source stream processing framework for
> > > distributed, high-performing, always-available, and accurate data
> > > streaming applications.
> > >
> > >
> > >
> > > The release is available for download at:
> > >
> > > https://flink.apache.org/downloads.html
> > >
> > >
> > >
> > > Please check out the release blog post for an overview of the
> > > improvements for this bugfix release:
> > >
> > > https://flink.apache.org/news/2023/05/25/release-1.16.2.html
> > >
> > >
> > >
> > > The full release notes are available in Jira:
> > >
> > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352765
> > >
> > >
> > >
> > > We would like to thank all contributors of the Apache Flink community
> > > who made this release possible!
> > >
> > >
> > >
> > > Feel free to reach out to the release managers (or respond to this
> > > thread) with feedback on the release process. Our goal is to
> > > constantly improve the release process. Feedback on what could be
> > > improved or things that didn't go so well are appreciated.
> > >
> > >
> > >
> > > Regards,
> > >
> > > Release Manager
> > >
> >
>


flink 输出异常数据

2023-05-28 Thread 小昌同学
各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|

自定义trigger触发问题

2023-05-28 Thread 吴先生
自定义的trigger,实现满足maxcount或者到达窗口结束时间时输出结果;
问题:同一个窗口,在代码窗口结束时onProcessingTime会触发多次,理论上每个爽口只应该在到达窗口结束时间触发一次,是什么原因
主类代码片段:
SingleOutputStreamOperator> windowMap =

afterMap.timeWindowAll(Time.seconds(5))
.trigger(new CountAndProcessingTimeTrigger(
100))
.process(simpleConfig.getWindowFunction().newInstance())
触发器代码:


public class CountAndProcessingTimeTrigger extends Trigger {
private static final long serialVersionUID = 1L;
//窗口最大个数
private final long maxCount;
private final ReducingStateDescriptor stateDesc;
public CountAndProcessingTimeTrigger(long maxCount) {
this.stateDesc = new ReducingStateDescriptor<>("count_time", new 
CountAndProcessingTimeTrigger.Sum(),
LongSerializer.INSTANCE);
this.maxCount = maxCount;
}
/**
 * 元素添加
 *
 * @param o 元素
 * @param timestamp timestamp
 * @param window window
 * @param triggerContext triggerContext
 * @return TriggerResult
 * CONTINUE:表示啥都不做。
 * FIRE:表示触发计算,同时保留窗口中的数据
 * PURGE:简单地删除窗口的内容,并保留关于窗口和任何触发器状态的任何潜在元信息。
 * FIRE_AND_PURGE:触发计算,然后清除窗口中的元素。
 * @throws Exception Exception
 */
@Override
public TriggerResult onElement(Object o, long timestamp, TimeWindow window, 
TriggerContext triggerContext)
throws Exception {
triggerContext.registerProcessingTimeTimer(window.maxTimestamp());
ReducingState countState = 
triggerContext.getPartitionedState(stateDesc);
countState.add(1L);
if (countState.get() >= maxCount) {
log.info("countTrigger: {}", countState.get());
countState.clear();
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
/**
 * 窗口关闭
 *
 * @param timestamp timestamp
 * @param window window
 * @param triggerContext triggerContext
 * @return TriggerResult
 * @throws Exception Exception
 */
@Override
public TriggerResult onProcessingTime(long timestamp, TimeWindow window, 
TriggerContext triggerContext)
throws Exception {
ReducingState countState = 
triggerContext.getPartitionedState(stateDesc);
log.info("timeTrigger: {}, currentProcessingTime:{}", countState.get(),
window.maxTimestamp());
countState.clear();
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long timestamp, TimeWindow window, 
TriggerContext triggerContext)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public boolean canMerge() {
return false;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
ctx.mergePartitionedState(stateDesc);
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
}
}
/**
 * 窗口删除
 *
 * @param window window
 * @param triggerContext triggerContext
 * @throws Exception Exception
 */
@Override
public void clear(TimeWindow window, TriggerContext triggerContext) throws 
Exception {
triggerContext.deleteProcessingTimeTimer(window.maxTimestamp());
triggerContext.getPartitionedState(stateDesc).clear();
}
/**
 * 计数方法
 */
private static class Sum implements ReduceFunction {
private static final long serialVersionUID = 1L;
private Sum() {
}
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}


| |
吴先生
|
|
15951914...@163.com
|