[ 
https://issues.apache.org/jira/browse/FLINK-25670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17484487#comment-17484487
 ] 

Kyle edited comment on FLINK-25670 at 1/31/22, 3:12 AM:
--------------------------------------------------------

Actually we are going to handle about 90000 * 100 = 9,000,000 doubles (72MB) 
received every 15 minutes. During processing, 9,000,000 * 6 = 54,000,000 
doubles (432MB) are retrieved and updated.

We'd like to make the 432MB data as state with high performance and guaranteed 
correctness. If they are kept in external storage (e.g. S3/HDFS), then they 
could be saved into external storage directly and it is not necessary to 
leverage StateFun or Flink.

What is your further suggestion? If sharding request if needed, how could we 
make large state partitioned in StateFun cluster and sent/received by remote 
function?

Thanks in advance.


was (Author: JIRAUSER283693):
Actually we are going to handle about 90000 * 100 = 9,000,000 doubles (72MB) 
received every 15 minutes. During processing, 9,000,000 * 6 = 54,000,000 
doubles (432MB) are retrieved and updated.

We'd like to make the 432MB data as state with high performance and guaranteed 
correctness. If they are kept in external storage (e.g. S3/HDFS), then they 
could be saved into external storage directly and it is not necessary to 
leverage StateFun or Flink.

What is your further suggestion? If sharding request if needed, how could we 
make large state partitioned in StateFun cluster?

Thanks in advance.

> StateFun: Unable to handle oversize HTTP message if state size is large
> -----------------------------------------------------------------------
>
>                 Key: FLINK-25670
>                 URL: https://issues.apache.org/jira/browse/FLINK-25670
>             Project: Flink
>          Issue Type: Bug
>          Components: Stateful Functions
>    Affects Versions: statefun-3.1.1
>            Reporter: Kyle
>            Priority: Major
>         Attachments: 00-module.yaml, functions.py
>
>
> Per requirement we need to handle state which is about 500MB large (72MB 
> state allocated in commented code as attached). However the HTTP message 
> limit disallows us to send back large state to StateFun cluster after saving 
> state in Stateful Function.
> Another question is whether large data is allowed to send to Stateful 
> Function from ingress.
>  
> 2022-01-17 07:57:18,416 WARN  
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequest [] - Exception 
> caught while trying to deliver a message: (attempt 
> #10)ToFunctionRequestSummary(address=Address(example, hello, 5555), 
> batchSize=1, totalSizeInBytes=80, numberOfStates=2)
> org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException: 
> Response entity too large: DefaultHttpResponse(decodeResult: success, 
> version: HTTP/1.1)
> HTTP/1.1 200 OK
> Content-Type: application/octet-stream
> Content-Length: 40579630
> Date: Mon, 17 Jan 2022 07:57:18 GMT
> Server: Python/3.9 aiohttp/3.8.1
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator.handleOversizedMessage(HttpObjectAggregator.java:276)
>  ~[statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator.handleOversizedMessage(HttpObjectAggregator.java:87)
>  ~[statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageAggregator.invokeHandleOversizedMessage(MessageAggregator.java:404)
>  ~[statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageAggregator.decode(MessageAggregator.java:254)
>  ~[statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
>  ~[statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:425)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>  [statefun-flink-distribution.jar:3.1.1]
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  [statefun-flink-distribution.jar:3.1.1]
>         at java.lang.Thread.run(Unknown Source) [?:?]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to