I would say so, yes.
Also could you set the paths where you want to use Presto to "s3p", as
described in [1], just to be sure that there is not ambiguity.

You could also make use of [2].

And thanks for looking into it!

Cheers,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#s3-specific
[2] 
https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters

On Thu, Apr 9, 2020 at 2:50 PM Roshan Punnoose <rosh...@gmail.com> wrote:
>
> Btw, I ran the same exact code on a local Flink cluster run with 
> `./bin/start-cluster.sh` on my local machine. With `s3a` it did not work, the 
> part files do not roll over; however, with the local filesystem it works 
> perfectly. Should I be looking at the S3Committer in Flink to see if there is 
> something odd going on?
>
> On Thu, Apr 9, 2020 at 7:49 AM Roshan Punnoose <rosh...@gmail.com> wrote:
>>
>> Nope just the s3a. I'll keep looking around to see if there is anything else 
>> I can see. If you think of anything else to try, let me know.
>>
>> On Thu, Apr 9, 2020, 7:41 AM Kostas Kloudas <kklou...@gmail.com> wrote:
>>>
>>> It should not be a problem because from what you posted, you are using
>>> "s3a" as the scheme for s3.
>>> Are you using "s3p" for Presto? This should also be done in order for
>>> Flink to understand where to use the one or the other.
>>>
>>> On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose <rosh...@gmail.com> wrote:
>>> >
>>> > Lastly, could it be the way I built the flink image for kube? I added 
>>> > both the presto and Hadoop plugins
>>> >
>>> > On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <rosh...@gmail.com> wrote:
>>> >>
>>> >> Sorry realized this came off the user list by mistake. Adding the thread 
>>> >> back in.
>>> >>
>>> >> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <rosh...@gmail.com> wrote:
>>> >>>
>>> >>> Yes sorry, no errors on the task manager. However, I am new to flink so 
>>> >>> don't know all the places to look for the logs. Been looking at the 
>>> >>> task manager logs and don't see any exceptions there. Not sure where to 
>>> >>> look for s3 exceptions in particular.
>>> >>>
>>> >>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <kklou...@gmail.com> wrote:
>>> >>>>
>>> >>>> Yes, this is why I reached out for further information.
>>> >>>>
>>> >>>> Incrementing the part counter is the responsibility of the
>>> >>>> StreamingFileSink, whose code is FS-agnostic, so it should also fail
>>> >>>> in the local FS.
>>> >>>> Now if it is on the S3 side, it would help if you have any more info,
>>> >>>> for example any logs from S3, to see if anything went wrong on their
>>> >>>> end.
>>> >>>>
>>> >>>> So your logs refer to normal execution, i.e. no failures and no
>>> >>>> restarting, right?
>>> >>>>
>>> >>>> Cheers,
>>> >>>> Kostas
>>> >>>>
>>> >>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <rosh...@gmail.com> 
>>> >>>> wrote:
>>> >>>> >
>>> >>>> > Surprisingly the same code running against the local filesystem 
>>> >>>> > works perfectly. The part counter increments correctly.
>>> >>>> >
>>> >>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <kklou...@gmail.com> 
>>> >>>> > wrote:
>>> >>>> >>
>>> >>>> >> Hi Roshan,
>>> >>>> >>
>>> >>>> >> Your logs refer to a simple run without any failures or re-running
>>> >>>> >> from a savepoint, right?
>>> >>>> >>
>>> >>>> >> I am asking because I am trying to reproduce it by running a 
>>> >>>> >> modified
>>> >>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>>> >>>> >> The ITCase runs against the local filesystem, and not S3, but I 
>>> >>>> >> added
>>> >>>> >> the OutputFileConfig and it seems that the part counter is increases
>>> >>>> >> as expected.
>>> >>>> >>
>>> >>>> >> Is there any other information that would help us reproduce the 
>>> >>>> >> issue?
>>> >>>> >>
>>> >>>> >> Cheers,
>>> >>>> >> Kostas
>>> >>>> >>
>>> >>>> >> [1] 
>>> >>>> >> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>>> >>>> >>
>>> >>>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <rosh...@gmail.com> 
>>> >>>> >> wrote:
>>> >>>> >> >
>>> >>>> >> > Hi,
>>> >>>> >> >
>>> >>>> >> > I am trying to get the parquet writer to write to s3; however, 
>>> >>>> >> > the files do not seem to be rolling over. The same file 
>>> >>>> >> > "part-0-0.parquet" is being created each time. Like the 
>>> >>>> >> > 'partCounter" is not being updated? Maybe the Bucket is being 
>>> >>>> >> > recreated each time? I don't really know... Here are some logs:
>>> >>>> >> >
>>> >>>> >> > 2020-04-09 01:28:10,350 INFO 
>>> >>>> >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets 
>>> >>>> >> > - Subtask 0 checkpointing for checkpoint with id=2 (max part 
>>> >>>> >> > counter=2).
>>> >>>> >> > 2020-04-09 01:28:10,589 INFO 
>>> >>>> >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets 
>>> >>>> >> > - Subtask 0 received completion notification for checkpoint with 
>>> >>>> >> > id=2.
>>> >>>> >> > 2020-04-09 01:28:10,589 INFO 
>>> >>>> >> > org.apache.flink.fs.s3.common.writer.S3Committer - Committing 
>>> >>>> >> > bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID 
>>> >>>> >> > Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>> >>>> >> > 2020-04-09 01:29:10,350 INFO 
>>> >>>> >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets 
>>> >>>> >> > - Subtask 0 checkpointing for checkpoint with id=3 (max part 
>>> >>>> >> > counter=3).
>>> >>>> >> > 2020-04-09 01:29:10,520 INFO 
>>> >>>> >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets 
>>> >>>> >> > - Subtask 0 received completion notification for checkpoint with 
>>> >>>> >> > id=3.
>>> >>>> >> > 2020-04-09 01:29:10,521 INFO 
>>> >>>> >> > org.apache.flink.fs.s3.common.writer.S3Committer - Committing 
>>> >>>> >> > bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID 
>>> >>>> >> > Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>> >>>> >> > And a part of my code:
>>> >>>> >> >
>>> >>>> >> > ```
>>> >>>> >> >
>>> >>>> >> > StreamExecutionEnvironment env = 
>>> >>>> >> > StreamExecutionEnvironment.getExecutionEnvironment();
>>> >>>> >> >
>>> >>>> >> > //        env.setParallelism(2);
>>> >>>> >> >         env.enableCheckpointing(60000L);
>>> >>>> >> > ///PROPERTIES Added
>>> >>>> >> >         Schema schema = bro_conn.getClassSchema();
>>> >>>> >> >
>>> >>>> >> >         OutputFileConfig config = OutputFileConfig
>>> >>>> >> >                 .builder()
>>> >>>> >> >                 .withPartSuffix(".parquet")
>>> >>>> >> >                 .build();
>>> >>>> >> >
>>> >>>> >> >         final StreamingFileSink<GenericRecord> sink = 
>>> >>>> >> > StreamingFileSink
>>> >>>> >> >                 .forBulkFormat(new 
>>> >>>> >> > Path("s3a://<bucket>/bro_conn/"), 
>>> >>>> >> > ParquetAvroWriters.forGenericRecord(schema))
>>> >>>> >> > //                
>>> >>>> >> > .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>> >>>> >> >                 .withOutputFileConfig(config)
>>> >>>> >> > //                .withBucketAssigner(new 
>>> >>>> >> > PartitioningBucketAssigner())
>>> >>>> >> >                 .build();
>>> >>>> >> >
>>> >>>> >> >         DataStream<String> kinesis = env.addSource(new 
>>> >>>> >> > FlinkKinesisConsumer<>(
>>> >>>> >> >                 "kinesis", new SimpleStringSchema(), 
>>> >>>> >> > consumerConfig));
>>> >>>> >> >
>>> >>>> >> >         kinesis.flatMap(new JsonAvroParser())
>>> >>>> >> >                 .addSink(sink);
>>> >>>> >> >
>>> >>>> >> >
>>> >>>> >> >         env.execute("Bro Conn");
>>> >>>> >> >
>>> >>>> >> > ```
>>> >>>> >> >
>>> >>>> >> > I'm using Flink 1.10.0, and running in Kubernetes. I also created 
>>> >>>> >> > a custom image to add the presto/hadoop plugin.
>>> >>>> >> >
>>> >>>> >> > Thanks again!

Reply via email to