Hi Ravi, some questions:

- Is this using Flink 1.6.2 with dependencies (flink-s3-fs-hadoop, 
flink-statebackend-rocksdb, hadoop-common, hadoop-aws, hadoop-hdfs, 
hadoop-common) ? If so, could you please share your dependency versioning?
- Does this use a kafka source with high flink parallelism (~400) for all kafka 
partitions and does it run continuously for several days?
- Could you please share your checkpoint interval configuration, batch file 
size, batch rollover interval configuration, and sink prefix (s3:// ,  s3a://)

Thank you
‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
On Saturday, November 3, 2018 7:18 AM, Ravi Bhushan Ratnakar 
<ravibhushanratna...@gmail.com> wrote:

> I have done little changes in BucketingSink and implemented as new 
> CustomBucketingSink to use in my project which works fine with s3 and s3a 
> protocol.  This implementation doesn't require xml file configuration, rather 
> than it uses configuration provided using flink configuration object by 
> calling setConfig method of BucketingSink.
>
> On Sat 3 Nov, 2018, 09:24 Flink Developer <developer...@protonmail.com wrote:
>
>> It seems the issue also appears when using Flink version 1.6.2 .
>> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
>> On Tuesday, October 30, 2018 10:26 PM, Flink Developer 
>> <developer...@protonmail.com> wrote:
>>
>>> Hi, thanks for the info Rafi, that seems to be related.  I hope Flink 
>>> version 1.6.2 fixes this. Has anyone encountered this before?
>>>
>>> I would also like to note that my jar includes a core-site.xml file that 
>>> uses *s3a*. Is this the recommended configuration to use with 
>>> BucketingSink?   Should the sink be specified using s3a://<bucket>/<prefix> 
>>> or  s3://<bucket>/<prefix> ?
>>>
>>> - <configuration>
>>> -     <property>
>>> -         <name>fs.s3.impl</name>
>>> -         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
>>> -     </property>
>>> -     <property>
>>> -         <name>fs.s3a.buffer.dir</name>
>>> -         <value>/tmp</value>
>>> -     </property>
>>> -     <property>
>>> -         <name>fs.s3a.access.key</name>
>>> -         <value>xxxxx</value>
>>> -     </property>
>>> -     <property>
>>> -         <name>fs.s3a.secret.key</name>
>>> -         <value>xxxxx</value>
>>> -     </property>
>>> -     <property>
>>> -         <name>fs.s3a.buffer.dir</name>
>>> -         <value>/tmp</value>
>>> -     </property>
>>> - </configuration>
>>>
>>> And my pom.xml uses:
>>>
>>> - <artifactId>flink-s3-fs-hadoop</artifactId>
>>> - ...
>>> - <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>>> - ...
>>> - <artifactId>hadoop-hdfs</artifactId>
>>> - ...
>>> - <artifactId>hadoop-common</artifactId>
>>> - ...
>>> - <artifactId>hadoop-core</artifactId>
>>> - ...
>>> - <artifactId>hadoop-aws</artifactId>
>>> - ...
>>>
>>> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
>>> On Sunday, October 28, 2018 8:08 AM, Rafi Aroch <rafi.ar...@gmail.com> 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm also experiencing this with Flink 1.5.2. This is probably related to 
>>>> BucketingSink not working properly with S3 as filesystem because of the 
>>>> eventual-consistency of S3.
>>>>
>>>> I see that https://issues.apache.org/jira/browse/FLINK-9752 will be part 
>>>> of 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and 
>>>> not presto).
>>>>
>>>> Does anyone know if this fix would solve this issue?
>>>>
>>>> Thanks,
>>>> Rafi
>>>>
>>>> On Sun, Oct 28, 2018 at 12:08 AM Flink Developer 
>>>> <developer...@protonmail.com> wrote:
>>>>
>>>>> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop 
>>>>> 2.8.4)  with flink parallelization set to 400. The source is a Kafka 
>>>>> topic and sinks to S3 in the format of: 
>>>>> s3://<day>/<hour>/<worker_number>/<files>. There's potentially 400 files 
>>>>> writing simultaneously.
>>>>>
>>>>> Configuration:
>>>>> - Flink v1.5.2
>>>>> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11, 
>>>>> v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause 
>>>>> between checkpoints in 2 mins. Timeout is set to 2 mins.
>>>>> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
>>>>> - Batch file size is set to 5mb.
>>>>> - Batch rollover interval is set to 30min
>>>>> - Writer uses GZip compression
>>>>> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1, 
>>>>> hadoop-core v1.2.1, hadoop-aws v3.1.1)
>>>>>
>>>>> The app is able to run for hours straight, but occasionally (once or 
>>>>> twice a day), it displays the following exception. When this happens, the 
>>>>> app is able to recover from previous checkpoint, but I am concerned about 
>>>>> the exception:
>>>>>
>>>>> Caused by: java.io.IOException: 
>>>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>>>>  Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: 
>>>>> xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)
>>>>>
>>>>> - at 
>>>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)
>>>>>
>>>>> Caused by: 
>>>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>>>>  Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: 
>>>>> xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4365)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4312)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1270)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8)
>>>>>
>>>>> - at 
>>>>> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Exxecutor.execute(GlobalS3Executor.java:91)
>>>>>
>>>>> And sometimes, it will show this:
>>>>>
>>>>> - java.lang.RuntimeException: Error while restoring BucketingSink state.
>>>>>
>>>>> - at 
>>>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:888)
>>>>>
>>>>> - at 
>>>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:767)
>>>>>
>>>>> - at 
>>>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:394)
>>>>>
>>>>> - at 
>>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>>>>
>>>>> - at 
>>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>>>>
>>>>> - at 
>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>>>>
>>>>> - at 
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>>>>>
>>>>> - at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>>>>>
>>>>> - at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>>>>>
>>>>> - at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>>
>>>>> What causes this and how can it be resolved? Thank you.
>>>>>
>>>>> There seems to be a related Flink ticket and PR here, but I'm not sure if 
>>>>> this is the exact same issue and if it has been resolved:
>>>>> https://issues.apache.org/jira/browse/FLINK-6306
>>>>> https://github.com/apache/flink/pull/3752
>>>>> https://github.com/apache/flink/pull/4607

Reply via email to