Thanks for the feedback Arvid. Currently isn't an issue, but I will look back into it in the future.
On Tue, Feb 18, 2020 at 1:51 PM Arvid Heise <ar...@ververica.com> wrote: > Hi David, > > sorry for replying late. I was caught up on other incidents. > > I double-checked all the information that you provided and conclude that > you completely bypass our filesystems and plugins. > > What you are using is AvroParquetWriter, which brings in the hadoop > dependencies, including raw hadoop s3. It becomes obvious since the Path > you are using is not coming from Flink namespace. > The class issues that come from that are hard to debug. You are > effectively bundling another hadoop, so if you also have a specific Hadoop > version on your cluster (e.g. on EMR), then there can be ambiguities and > the seen error happens. > > What I'd recommend you do is a completely different approach. Assuming you > just want exponential backoff for all s3 write accesses, you could wrap the > S3AFileSystem and create your own s3 plugin. That would work with any given > format for future cases. > > If you want to stick to your approach, you should use > org.apache.flink.formats.parquet.ParquetWriterFactory, which uses your > mentioned StreamOutputFile. > > Best, > > Arvid > > On Thu, Feb 13, 2020 at 12:04 AM David Magalhães <speeddra...@gmail.com> > wrote: > >> Hi Arvid, I use a docker image. Here is the Dockerfile: >> >> FROM flink:1.9.1-scala_2.12 >> >> RUN mkdir /opt/flink/plugins/flink-s3-fs-hadoop >> RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.9.1.jar >> /opt/flink/plugins/flink-s3-fs-hadoop/ >> >> Please let me know if you need more information. >> >> On Wed, Feb 12, 2020 at 9:15 PM Arvid Heise <ar...@ververica.com> wrote: >> >>> Hi David, >>> >>> can you double-check the folder structure of your plugin? It should >>> reside in its own subfolder. Here is an example. >>> >>> flink-dist >>> ├── conf >>> ├── lib >>> ... >>> └── plugins >>> └── s3 >>> └── flink-s3-fs-hadoop.jar >>> >>> I will investigate your error deeply in the next few days but I'd like >>> to have a final confirmation about the folder structure. >>> >>> >>> On Wed, Feb 12, 2020 at 8:56 PM David Magalhães <speeddra...@gmail.com> >>> wrote: >>> >>>> Hi Robert, I couldn't found any previous mention before the >>>> NoClassDefFoundError. >>>> Here is the full log [1] if you want to look for something more >>>> specific. >>>> >>>> [1] https://www.dropbox.com/s/l8tba6vft08flke/joda.out?dl=0 >>>> >>>> On Wed, Feb 12, 2020 at 12:45 PM Robert Metzger <rmetz...@apache.org> >>>> wrote: >>>> >>>>> According to this answer [1] the first exception "mentioning" >>>>> org/joda/time/format/DateTimeParserBucket should be a different one. >>>>> Can you go through the logs to make sure it is really a >>>>> ClassNotFoundException, and not a ExceptionInInitializerError or something >>>>> else? >>>>> >>>>> [1]https://stackoverflow.com/a/5756989/568695 >>>>> >>>>> On Wed, Feb 12, 2020 at 12:36 PM David Magalhães < >>>>> speeddra...@gmail.com> wrote: >>>>> >>>>>> Hi Arvid, >>>>>> >>>>>> I'm using flink-s3-fs-hadoop-1.9.1.jar in plugins folder. Like I said >>>>>> previously, this works normally until an exception is throw inside the >>>>>> sink. It will try to recover again, but sometimes doesn't recover giving >>>>>> this error. >>>>>> >>>>>> To write to S3 I use *AvroParquetWriter* with the following code: >>>>>> >>>>>> val writer = AvroParquetWriter >>>>>> .builder[GenericRecord](new Path(finalFilePath)) >>>>>> >>>>>> *Path* is from *org.apache.hadoop.fs*, the other option is to use* >>>>>> org.apache.flink.formats.parquet.StreamOutputFile >>>>>> *which will use flink S3 plugin, right ? Not sure how can I convert >>>>>> from Path to StreamOuputFile. I know that when I've used >>>>>> StreamingFileSink, >>>>>> I used StreamOuputFile. >>>>>> >>>>>> On Wed, Feb 12, 2020 at 10:03 AM Arvid Heise <ar...@ververica.com> >>>>>> wrote: >>>>>> >>>>>>> Hi David, >>>>>>> >>>>>>> upon closer reviewing your stacktrace, it seems like you are trying >>>>>>> to access S3 without our S3 plugin. That's in general not recommended at >>>>>>> all. >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> Arvid >>>>>>> >>>>>>> On Tue, Feb 11, 2020 at 11:06 AM Arvid Heise <ar...@ververica.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi David, >>>>>>>> >>>>>>>> this seems to be a bug in our s3 plugin. The joda dependency should >>>>>>>> be bundled there. >>>>>>>> >>>>>>>> Are you using s3 as a plugin by any chance? Which flink version are >>>>>>>> you using? >>>>>>>> >>>>>>>> If you are using s3 as a plugin, you could put joda in your plugin >>>>>>>> folder like this >>>>>>>> >>>>>>>> flink-dist >>>>>>>> ├── conf >>>>>>>> ├── lib >>>>>>>> ... >>>>>>>> └── plugins >>>>>>>> └── s3 >>>>>>>> ├── joda.jar >>>>>>>> └── flink-s3-fs-hadoop.jar >>>>>>>> >>>>>>>> If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into >>>>>>>> that. >>>>>>>> >>>>>>>> Adding joda to your user code will unfortunately not work. >>>>>>>> >>>>>>>> Best, >>>>>>>> >>>>>>>> Arvid >>>>>>>> >>>>>>>> On Thu, Feb 6, 2020 at 11:16 PM David Magalhães < >>>>>>>> speeddra...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Andrey, thanks for your reply. >>>>>>>>> >>>>>>>>> The class is on the jar created with `*sbt assembly*` that is >>>>>>>>> submitted to Flink to start a Job. >>>>>>>>> >>>>>>>>> unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep >>>>>>>>> DateTimeParserBucket >>>>>>>>> 1649 05-27-2016 10:24 >>>>>>>>> org/joda/time/format/DateTimeParserBucket$SavedField.class >>>>>>>>> 1984 05-27-2016 10:24 >>>>>>>>> org/joda/time/format/DateTimeParserBucket$SavedState.class >>>>>>>>> 8651 05-27-2016 10:24 >>>>>>>>> org/joda/time/format/DateTimeParserBucket.class >>>>>>>>> >>>>>>>>> Shouldn't this be enough ? >>>>>>>>> >>>>>>>>> I think it uses is when nothing happens, but as soon it have some >>>>>>>>> exceptions, looks like it "forgets" it. >>>>>>>>> >>>>>>>>> Like I said before, this is kind of intermittent. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> David >>>>>>>>> >>>>>>>>> On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin < >>>>>>>>> azagre...@apache.org> wrote: >>>>>>>>> >>>>>>>>>> Hi David, >>>>>>>>>> >>>>>>>>>> This looks like a problem with resolution of maven dependencies >>>>>>>>>> or something. >>>>>>>>>> The custom WindowParquetGenericRecordListFileSink probably >>>>>>>>>> transitively depends on org/joda/time/format/DateTimeParserBucket >>>>>>>>>> and it is missing on the runtime classpath of Flink. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Andrey >>>>>>>>>> >>>>>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães < >>>>>>>>>> speeddra...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> I'm implementing an exponential backoff inside a custom sink >>>>>>>>>>> that uses an AvroParquetWriter to write to S3. I've change the >>>>>>>>>>> number of >>>>>>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the >>>>>>>>>>> timeout >>>>>>>>>>> exception, doing a Thread.sleep for X seconds. This is working as >>>>>>>>>>> intended, >>>>>>>>>>> and when S3 is offline, it waits until it is online. >>>>>>>>>>> >>>>>>>>>>> I also want to test that the back pressure and the checkpoints >>>>>>>>>>> are working as intended, and for the first one, I can see the back >>>>>>>>>>> pressure >>>>>>>>>>> in Flink UI going up, and recover as expected and not reading more >>>>>>>>>>> data >>>>>>>>>>> from Kafka. >>>>>>>>>>> >>>>>>>>>>> For the checkpoints, and I've added inside the sink invoke >>>>>>>>>>> function a randomly exception (1 in 100, to simulate that a problem >>>>>>>>>>> has >>>>>>>>>>> happen, and need to recover from the last good checkpoint), but >>>>>>>>>>> something >>>>>>>>>>> strange happens. I can see the job is being canceled and created >>>>>>>>>>> again, and >>>>>>>>>>> running fine, other times after a X number of times of being >>>>>>>>>>> created and >>>>>>>>>>> canceled, it gives a *NoClassDefFoundError*, and it will keep >>>>>>>>>>> giving that forever. >>>>>>>>>>> >>>>>>>>>>> Do you guys have any thoughts? >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: >>>>>>>>>>> Caught exception while processing timer. >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285) >>>>>>>>>>> at >>>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>>>>>> at >>>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>>>>>>>>>> at >>>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>>>>>>>>> at >>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>>>>>>> at >>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError: >>>>>>>>>>> org/joda/time/format/DateTimeParserBucket} >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284) >>>>>>>>>>> ... 7 more >>>>>>>>>>> Caused by: java.lang.NoClassDefFoundError: >>>>>>>>>>> org/joda/time/format/DateTimeParserBucket >>>>>>>>>>> at >>>>>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027) >>>>>>>>>>> at >>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904) >>>>>>>>>>> at >>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553) >>>>>>>>>>> at >>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555) >>>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) >>>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) >>>>>>>>>>> at >>>>>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81) >>>>>>>>>>> at >>>>>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246) >>>>>>>>>>> at >>>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280) >>>>>>>>>>> at >>>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535) >>>>>>>>>>> at >>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59) >>>>>>>>>>> at >>>>>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) >>>>>>>>>>> at scala.util.Try$.apply(Try.scala:209) >>>>>>>>>>> at >>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54) >>>>>>>>>>> at >>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >>>>>>>>>>> at >>>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45) >>>>>>>>>>> at >>>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) >>>>>>>>>>> ... 7 more >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães < >>>>>>>>>> speeddra...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> I'm implementing an exponential backoff inside a custom sink >>>>>>>>>>> that uses an AvroParquetWriter to write to S3. I've change the >>>>>>>>>>> number of >>>>>>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the >>>>>>>>>>> timeout >>>>>>>>>>> exception, doing a Thread.sleep for X seconds. This is working as >>>>>>>>>>> intended, >>>>>>>>>>> and when S3 is offline, it waits until it is online. >>>>>>>>>>> >>>>>>>>>>> I also want to test that the back pressure and the checkpoints >>>>>>>>>>> are working as intended, and for the first one, I can see the back >>>>>>>>>>> pressure >>>>>>>>>>> in Flink UI going up, and recover as expected and not reading more >>>>>>>>>>> data >>>>>>>>>>> from Kafka. >>>>>>>>>>> >>>>>>>>>>> For the checkpoints, and I've added inside the sink invoke >>>>>>>>>>> function a randomly exception (1 in 100, to simulate that a problem >>>>>>>>>>> has >>>>>>>>>>> happen, and need to recover from the last good checkpoint), but >>>>>>>>>>> something >>>>>>>>>>> strange happens. I can see the job is being canceled and created >>>>>>>>>>> again, and >>>>>>>>>>> running fine, other times after a X number of times of being >>>>>>>>>>> created and >>>>>>>>>>> canceled, it gives a *NoClassDefFoundError*, and it will keep >>>>>>>>>>> giving that forever. >>>>>>>>>>> >>>>>>>>>>> Do you guys have any thoughts? >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: >>>>>>>>>>> Caught exception while processing timer. >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285) >>>>>>>>>>> at >>>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>>>>>> at >>>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>>>>>>>>>> at >>>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>>>>>>>>> at >>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>>>>>>> at >>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError: >>>>>>>>>>> org/joda/time/format/DateTimeParserBucket} >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284) >>>>>>>>>>> ... 7 more >>>>>>>>>>> Caused by: java.lang.NoClassDefFoundError: >>>>>>>>>>> org/joda/time/format/DateTimeParserBucket >>>>>>>>>>> at >>>>>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050) >>>>>>>>>>> at >>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027) >>>>>>>>>>> at >>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904) >>>>>>>>>>> at >>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553) >>>>>>>>>>> at >>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555) >>>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) >>>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) >>>>>>>>>>> at >>>>>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81) >>>>>>>>>>> at >>>>>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246) >>>>>>>>>>> at >>>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280) >>>>>>>>>>> at >>>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535) >>>>>>>>>>> at >>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59) >>>>>>>>>>> at >>>>>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) >>>>>>>>>>> at scala.util.Try$.apply(Try.scala:209) >>>>>>>>>>> at >>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54) >>>>>>>>>>> at >>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >>>>>>>>>>> at >>>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45) >>>>>>>>>>> at >>>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) >>>>>>>>>>> ... 7 more >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>