It should not be a problem because from what you posted, you are using
"s3a" as the scheme for s3.
Are you using "s3p" for Presto? This should also be done in order for
Flink to understand where to use the one or the other.

On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose <rosh...@gmail.com> wrote:
>
> Lastly, could it be the way I built the flink image for kube? I added both 
> the presto and Hadoop plugins
>
> On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <rosh...@gmail.com> wrote:
>>
>> Sorry realized this came off the user list by mistake. Adding the thread 
>> back in.
>>
>> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <rosh...@gmail.com> wrote:
>>>
>>> Yes sorry, no errors on the task manager. However, I am new to flink so 
>>> don't know all the places to look for the logs. Been looking at the task 
>>> manager logs and don't see any exceptions there. Not sure where to look for 
>>> s3 exceptions in particular.
>>>
>>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <kklou...@gmail.com> wrote:
>>>>
>>>> Yes, this is why I reached out for further information.
>>>>
>>>> Incrementing the part counter is the responsibility of the
>>>> StreamingFileSink, whose code is FS-agnostic, so it should also fail
>>>> in the local FS.
>>>> Now if it is on the S3 side, it would help if you have any more info,
>>>> for example any logs from S3, to see if anything went wrong on their
>>>> end.
>>>>
>>>> So your logs refer to normal execution, i.e. no failures and no
>>>> restarting, right?
>>>>
>>>> Cheers,
>>>> Kostas
>>>>
>>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <rosh...@gmail.com> wrote:
>>>> >
>>>> > Surprisingly the same code running against the local filesystem works 
>>>> > perfectly. The part counter increments correctly.
>>>> >
>>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <kklou...@gmail.com> wrote:
>>>> >>
>>>> >> Hi Roshan,
>>>> >>
>>>> >> Your logs refer to a simple run without any failures or re-running
>>>> >> from a savepoint, right?
>>>> >>
>>>> >> I am asking because I am trying to reproduce it by running a modified
>>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>>>> >> The ITCase runs against the local filesystem, and not S3, but I added
>>>> >> the OutputFileConfig and it seems that the part counter is increases
>>>> >> as expected.
>>>> >>
>>>> >> Is there any other information that would help us reproduce the issue?
>>>> >>
>>>> >> Cheers,
>>>> >> Kostas
>>>> >>
>>>> >> [1] 
>>>> >> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>>>> >>
>>>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <rosh...@gmail.com> 
>>>> >> wrote:
>>>> >> >
>>>> >> > Hi,
>>>> >> >
>>>> >> > I am trying to get the parquet writer to write to s3; however, the 
>>>> >> > files do not seem to be rolling over. The same file 
>>>> >> > "part-0-0.parquet" is being created each time. Like the 'partCounter" 
>>>> >> > is not being updated? Maybe the Bucket is being recreated each time? 
>>>> >> > I don't really know... Here are some logs:
>>>> >> >
>>>> >> > 2020-04-09 01:28:10,350 INFO 
>>>> >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - 
>>>> >> > Subtask 0 checkpointing for checkpoint with id=2 (max part counter=2).
>>>> >> > 2020-04-09 01:28:10,589 INFO 
>>>> >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - 
>>>> >> > Subtask 0 received completion notification for checkpoint with id=2.
>>>> >> > 2020-04-09 01:28:10,589 INFO 
>>>> >> > org.apache.flink.fs.s3.common.writer.S3Committer - Committing 
>>>> >> > bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID 
>>>> >> > Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>>> >> > 2020-04-09 01:29:10,350 INFO 
>>>> >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - 
>>>> >> > Subtask 0 checkpointing for checkpoint with id=3 (max part counter=3).
>>>> >> > 2020-04-09 01:29:10,520 INFO 
>>>> >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - 
>>>> >> > Subtask 0 received completion notification for checkpoint with id=3.
>>>> >> > 2020-04-09 01:29:10,521 INFO 
>>>> >> > org.apache.flink.fs.s3.common.writer.S3Committer - Committing 
>>>> >> > bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID 
>>>> >> > Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>>> >> > And a part of my code:
>>>> >> >
>>>> >> > ```
>>>> >> >
>>>> >> > StreamExecutionEnvironment env = 
>>>> >> > StreamExecutionEnvironment.getExecutionEnvironment();
>>>> >> >
>>>> >> > //        env.setParallelism(2);
>>>> >> >         env.enableCheckpointing(60000L);
>>>> >> > ///PROPERTIES Added
>>>> >> >         Schema schema = bro_conn.getClassSchema();
>>>> >> >
>>>> >> >         OutputFileConfig config = OutputFileConfig
>>>> >> >                 .builder()
>>>> >> >                 .withPartSuffix(".parquet")
>>>> >> >                 .build();
>>>> >> >
>>>> >> >         final StreamingFileSink<GenericRecord> sink = 
>>>> >> > StreamingFileSink
>>>> >> >                 .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), 
>>>> >> > ParquetAvroWriters.forGenericRecord(schema))
>>>> >> > //                
>>>> >> > .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>> >> >                 .withOutputFileConfig(config)
>>>> >> > //                .withBucketAssigner(new 
>>>> >> > PartitioningBucketAssigner())
>>>> >> >                 .build();
>>>> >> >
>>>> >> >         DataStream<String> kinesis = env.addSource(new 
>>>> >> > FlinkKinesisConsumer<>(
>>>> >> >                 "kinesis", new SimpleStringSchema(), consumerConfig));
>>>> >> >
>>>> >> >         kinesis.flatMap(new JsonAvroParser())
>>>> >> >                 .addSink(sink);
>>>> >> >
>>>> >> >
>>>> >> >         env.execute("Bro Conn");
>>>> >> >
>>>> >> > ```
>>>> >> >
>>>> >> > I'm using Flink 1.10.0, and running in Kubernetes. I also created a 
>>>> >> > custom image to add the presto/hadoop plugin.
>>>> >> >
>>>> >> > Thanks again!

Reply via email to