Flink DataStream 用ElasticsearchSink 写ES ConnectionClosedException异常

2019-09-17 文章 王佩
在Flink 写ES,当ES集群繁忙时,会有如下异常:

2019-09-17 16:01:02 ERROR
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase
afterBulk 430 Failed Elasticsearch bulk request: Connection closed
org.apache.http.ConnectionClosedException: Connection closed
at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:345)
at
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at
org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:745)
2019-09-17 16:01:02 ERROR
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase
afterBulk 430 Failed Elasticsearch bulk request: Connection closed
org.apache.http.ConnectionClosedException: Connection closed
at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:345)
at
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at
org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:745)

Flink 用ElasticsearchSink 写ES时遇到这种异常后,不会自动恢复连接。

目前已知:

ElasticsearchSink中的异常处理,需要通过ElasticsearchSink的setFailureHandler方法来定义各种失败处理方式(如丢弃数据或将数据重新加入队列),但这里不能实现当ConnectionClosedException时,重新打开ES连接。

希望Flink遇到这种异常后,能自动重新打开ES的连接,该如何实现?

感谢


Re: 回复: Split a stream into any number of streams

2019-09-17 文章 cai yi
你这个场景貌似可以用Broadcast来广播自定义的事件规则然后join数据流, 之后可以在process中进行处理...

在 2019/9/17 下午4:52,“venn” 写入:

恐怕不行,sideoutput 和 split 都需要先知道要分多少个流

如sideoutput 需要先定义tag:
val late = new OutputTag[LateDataEvent]("late")


-邮件原件-
发件人: user-zh-return-1164-wxchunjhyy=163@flink.apache.org 
 代表 王佩
发送时间: Tuesday, September 17, 2019 4:25 PM
收件人: user-zh 
主题: Re: Split a stream into any number of streams

是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。

Flink

从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
Sink 到Parquet。

1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。

感谢!

cai yi  于2019年9月17日周二 下午1:33写道:

> 可以使用Side Output,
> 
将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
>
> 在 2019/9/17 上午10:05,“Wesley Peng” 写入:
>
>
>
> on 2019/9/17 9:55, 王佩 wrote:
> > I want to split a stream into any number of streams according to 
> a field,
> > and then process the split stream one by one.
>
> I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter
> -or-split-to-split-a-stream
>
> regards.
>
>



在where条件中使用汉字导致查询出的字段出现unicode编码

2019-09-17 文章 苏 欣
我使用的是flink 
1.9版本,在sql中将where条件的一个字段传入了汉字,返回的字段值却是unicode编码。如果不按照汉字做条件,返回的字段值则是汉字。请问有没有人遇到过这个问题?

测试代码:

tEnv.connect(
new Kafka()
.version("universal")
.topic("flink-test-topic-1")
.startFromGroupOffsets()
.property("bootstrap.servers", "192.168.129.101:9192")
.property("group.id", "flink-test-consumer-group")
).withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
).withSchema(new Schema()
.field("TI", Types.STRING)
.field("EV", Types.STRING)
.field("CS_HOST", Types.STRING)
.field("DCS_ID", Types.STRING)
.field("complex_row",
Types.ROW_NAMED(new String[]{"first_level_row", 
"first_level_int"},
Types.ROW_NAMED(new String[]{"second_level_str1", 
"second_level_str2"}, Types.STRING, Types.STRING),
Types.INT))
.field("proc", Types.SQL_TIMESTAMP).proctime()
).inAppendMode().registerTableSource("kafka_src");

Table table1 = tEnv.sqlQuery("select * from kafka_src where TI = '会话登录'");
Table table2 = tEnv.sqlQuery("select * from kafka_src where EV = 'view'");

tEnv.toAppendStream(table1, Row.class).print();
tEnv.toAppendStream(table2, Row.class).print();

输出结果:
\u4F1A\u8BDD\u767B\u5F55,view,-,hCOsDjIKi8pcW0VmFASlY4bTMw7yZG,aaa,bbb,100,2019-09-17T09:42:43.731
会话登录,view,-,hCOsDjIKi8pcW0VmFASlY4bTMw7yZG,aaa,bbb,100,2019-09-17T09:42:43.731

发送自 Windows 10 版邮件应用



写入到Kafka的时候报错ByteArraySerializer could not be found.

2019-09-17 文章 qq邮箱
在运行flink写入到kafka的时候提示如下错误

org.apache.kafka.common.config.ConfigException: Invalid value 
org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArraySerializer
 for configuration key.serializer: Class 
org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArraySerializer
 could not be found.






回复: Split a stream into any number of streams

2019-09-17 文章 Jun Zhang
对DataStream进行keyBy操作是否能解决呢?




--原始邮件--
发件人:"venn"https://stackoverflow.com/questions/53588554/apache-flink-using-filter
 -or-split-to-split-a-stream

 regards.



Re: Split a stream into any number of streams

2019-09-17 文章 王佩
是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。

Flink
从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
Sink 到Parquet。

1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。

感谢!

cai yi  于2019年9月17日周二 下午1:33写道:

> 可以使用Side Output,
> 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
>
> 在 2019/9/17 上午10:05,“Wesley Peng” 写入:
>
>
>
> on 2019/9/17 9:55, 王佩 wrote:
> > I want to split a stream into any number of streams according to a
> field,
> > and then process the split stream one by one.
>
> I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
>
> regards.
>
>