回复:消息队列量级特别如何优化消费

2021-03-05 文章 allanqinjy
感谢各位的回答


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年03月5日 19:23,smq<374060...@qq.com> 写道:
被压严重一般是sink效率低或者计算过程有瓶颈,被压跟数据源数据多少没什么关系。你可以在web界面查看哪个算子导致的,然后优化就可以了



发自我的iPhone


-- 原始邮件 --
发件人: 刘建刚 

回复:消息队列量级特别如何优化消费

2021-03-05 文章 smq
被压严重一般是sink效率低或者计算过程有瓶颈,被压跟数据源数据多少没什么关系。你可以在web界面查看哪个算子导致的,然后优化就可以了



发自我的iPhone


-- 原始邮件 --
发件人: 刘建刚 

回复:消息队列量级特别如何优化消费

2021-03-05 文章 smq
被压严重一般是sink效率低或者计算过程有瓶颈,被压跟数据源数据多少没什么关系。你可以在web界面查看哪个算子导致的,然后优化就可以了



发自我的iPhone


-- 原始邮件 --
发件人: 刘建刚 

Reply:消息队列量级特别如何优化消费

2021-03-05 文章 smq
被压严重一般是sink效率低或者计算过程有瓶颈,被压跟数据源数据多少没什么关系



发自我的iPhone


-- 原始邮件 --
From: 刘建刚 

[sql]TimeStamp和异常格式的字符串进行比较时会报空指针

2021-03-05 文章 silence
问题描述:
TimeStamp类型和异常格式的字符串进行比较时会在任务运行时报空指针
像这种错误虽然是用户书写错误导致的,但运行时才能发现问题,且sql太长时不好定位具体原因
是否可以在编译期进行类型的验证,尽早发现问题并给出sql的文本坐标

例:where CURRENT_TIMESTAMP=''
  where CURRENT_TIMESTAMP='19700101'

java.lang.NullPointerException: null
at
org.apache.flink.table.data.TimestampData.compareTo(TimestampData.java:112)
at StreamExecCalc$4.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:76)
at
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 消息队列量级特别如何优化消费

2021-03-05 文章 刘建刚
本质原因是作业资源不足无法处理大量数据,好像只有扩大并发来解决了。


allanqinjy  于2021年3月5日周五 上午10:48写道:

>
>
> hi,
>   由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!
>
>
> | |
> allanqinjy
> |
> |
> allanqi...@163.com
> |
> 签名由网易邮箱大师定制
>
>


?????? flink sql??????????????io??????????

2021-03-05 文章 ????
??




----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/legacy.html#temporal-table-function-join
flink sql?? temporal-table join io

casel.chen 

flink dataStream多次sink DAG重复驱动执行?

2021-03-05 文章 lp


有个疑问,
如下程序片段:

--
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",kafkaAddr);
properties.setProperty("group.id",kafkaOdsGroup);
properties.setProperty("auto.offset.reset",kafkaOdsAutoOffsetReset);
   
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,kafkaOdsPartitionDiscoverInterval);
   
properties.setProperty("transaction.timeout.ms",KafkaOdsTransactionTimeout);//kafka事务超时时间

FlinkKafkaConsumer flinkKafkaConsumer = new
FlinkKafkaConsumer<>(kafkaOdsTopic,new SimpleStringSchema(),properties);
DataStreamSource dataStreamSource =
env.addSource(flinkKafkaConsumer);
dataStreamSource.printToErr("1");
dataStreamSource.printToErr("2");
dataStreamSource.printToErr("3");



我对一个datastream进行多次相同操作的sink,请问是否会导致上游整个DAG重复驱动执行,基于spark的惯性思维,我认为上游DAG是会重复驱动执行的?



--
Sent from: http://apache-flink.147419.n8.nabble.com/