[ 
https://issues.apache.org/jira/browse/FLINK-7589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16166527#comment-16166527
 ] 

Steve Loughran commented on FLINK-7589:
---------------------------------------

bq.  is there any plan to make Flink use AWS S3 SDK directly rather than 
Hadoop's S3 implementation?

the topic has surface. Note that if you do want a client which is resilient to 
transitive failures, that's a fair amount of extra work, and you have to 
implement the fault injecting wrapper on the AWS SDK to actually verify that it 
works on an functional test suite...that's something to consider shipping so 
that people can use it in their own integration tests.

> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536)
> -------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7589
>                 URL: https://issues.apache.org/jira/browse/FLINK-7589
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.3.2
>            Reporter: Bowen Li
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.3
>
>
> When I tried to resume a Flink job from a savepoint with different 
> parallelism, I ran into this error. And the resume failed.
> {code:java}
> 2017-09-05 21:53:57,317 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - returnsivs -> 
> Sink: xxx (7/12) (0da16ec908fc7b9b16a5c2cf1aa92947) switched from RUNNING to 
> FAILED.
> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536
>       at 
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180)
>       at 
> org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
>       at java.io.FilterInputStream.read(FilterInputStream.java:133)
>       at java.io.FilterInputStream.read(FilterInputStream.java:133)
>       at java.io.FilterInputStream.read(FilterInputStream.java:133)
>       at 
> com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77)
>       at java.io.FilterInputStream.read(FilterInputStream.java:133)
>       at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160)
>       at java.io.DataInputStream.read(DataInputStream.java:149)
>       at 
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:72)
>       at 
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:61)
>       at 
> org.apache.flink.runtime.util.NonClosingStreamDecorator.read(NonClosingStreamDecorator.java:47)
>       at java.io.DataInputStream.readFully(DataInputStream.java:195)
>       at java.io.DataInputStream.readLong(DataInputStream.java:416)
>       at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68)
>       at 
> org.apache.flink.streaming.api.operators.InternalTimer$TimerSerializer.deserialize(InternalTimer.java:156)
>       at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:345)
>       at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>       at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to