在Flink 写ES,当ES集群繁忙时,会有如下异常:
2019-09-17 16:01:02 ERROR
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase
afterBulk 430 Failed Elasticsearch bulk request: Connection closed
org.apache.http.ConnectionClosedException: Connection closed
at
你这个场景貌似可以用Broadcast来广播自定义的事件规则然后join数据流, 之后可以在process中进行处理...
在 2019/9/17 下午4:52,“venn” 写入:
恐怕不行,sideoutput 和 split 都需要先知道要分多少个流
如sideoutput 需要先定义tag:
val late = new OutputTag[LateDataEvent]("late")
-邮件原件-
发件人:
我使用的是flink
1.9版本,在sql中将where条件的一个字段传入了汉字,返回的字段值却是unicode编码。如果不按照汉字做条件,返回的字段值则是汉字。请问有没有人遇到过这个问题?
测试代码:
tEnv.connect(
new Kafka()
.version("universal")
.topic("flink-test-topic-1")
.startFromGroupOffsets()
在运行flink写入到kafka的时候提示如下错误
org.apache.kafka.common.config.ConfigException: Invalid value
org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArraySerializer
for configuration key.serializer: Class
对DataStream进行keyBy操作是否能解决呢?
--原始邮件--
发件人:"venn"https://stackoverflow.com/questions/53588554/apache-flink-using-filter
-or-split-to-split-a-stream
regards.
是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。
Flink
从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
Sink 到Parquet。
1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+