Hi David,

upon closer reviewing your stacktrace, it seems like you are trying to
access S3 without our S3 plugin. That's in general not recommended at all.

Best,

Arvid

On Tue, Feb 11, 2020 at 11:06 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi David,
>
> this seems to be a bug in our s3 plugin. The joda dependency should be
> bundled there.
>
> Are you using s3 as a plugin by any chance? Which flink version are you
> using?
>
> If you are using s3 as a plugin, you could put joda in your plugin folder
> like this
>
> flink-dist
> ├── conf
> ├── lib
> ...
> └── plugins
>     └── s3
>         ├── joda.jar
>         └── flink-s3-fs-hadoop.jar
>
> If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into that.
>
> Adding joda to your user code will unfortunately not work.
>
> Best,
>
> Arvid
>
> On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <speeddra...@gmail.com>
> wrote:
>
>> Hi Andrey, thanks for your reply.
>>
>> The class is on the jar created with `*sbt assembly*` that is
>> submitted to Flink to start a Job.
>>
>> unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep DateTimeParserBucket
>>      1649  05-27-2016 10:24
>> org/joda/time/format/DateTimeParserBucket$SavedField.class
>>      1984  05-27-2016 10:24
>> org/joda/time/format/DateTimeParserBucket$SavedState.class
>>      8651  05-27-2016 10:24
>> org/joda/time/format/DateTimeParserBucket.class
>>
>> Shouldn't this be enough ?
>>
>> I think it uses is when nothing happens, but as soon it have some
>> exceptions, looks like it "forgets" it.
>>
>> Like I said before, this is kind of intermittent.
>>
>> Thanks,
>> David
>>
>> On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <azagre...@apache.org>
>> wrote:
>>
>>> Hi David,
>>>
>>> This looks like a problem with resolution of maven dependencies or
>>> something.
>>> The custom WindowParquetGenericRecordListFileSink probably transitively
>>> depends on org/joda/time/format/DateTimeParserBucket
>>> and it is missing on the runtime classpath of Flink.
>>>
>>> Best,
>>> Andrey
>>>
>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <speeddra...@gmail.com>
>>> wrote:
>>>
>>>> I'm implementing an exponential backoff inside a custom sink that uses
>>>> an AvroParquetWriter to write to S3. I've change the number of attempts to
>>>> 0 inside the core-site.xml, and I'm capturing the timeout exception, doing
>>>> a Thread.sleep for X seconds. This is working as intended, and when S3 is
>>>> offline, it waits until it is online.
>>>>
>>>> I also want to test that the back pressure and the checkpoints are
>>>> working as intended, and for the first one, I can see the back pressure in
>>>> Flink UI going up, and recover as expected and not reading more data from
>>>> Kafka.
>>>>
>>>> For the checkpoints, and I've added inside the sink invoke function a
>>>> randomly exception (1 in 100, to simulate that a problem has happen, and
>>>> need to recover from the last good checkpoint), but something strange
>>>> happens. I can see the job is being canceled and created again, and running
>>>> fine, other times after a X number of times of being created and canceled,
>>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>>> forever.
>>>>
>>>> Do you guys have any thoughts?
>>>>
>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>>> exception while processing timer.
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>> at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>> org/joda/time/format/DateTimeParserBucket}
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>> ... 7 more
>>>> Caused by: java.lang.NoClassDefFoundError:
>>>> org/joda/time/format/DateTimeParserBucket
>>>> at
>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>> at
>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>> at
>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>> at
>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>> at
>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>> at
>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>> at
>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>> at
>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>> at
>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>> at
>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>> at
>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>> at
>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>> at
>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>> at
>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>> at
>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>> at
>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>> at
>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>> at
>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>> at
>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>> at
>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>> at scala.util.Try$.apply(Try.scala:209)
>>>> at
>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>> at
>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>> at
>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>> at
>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>> at
>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>> at
>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>> at
>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>> at
>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>> at
>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>> at
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>> ... 7 more
>>>>
>>>>
>>>>
>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <speeddra...@gmail.com>
>>> wrote:
>>>
>>>> I'm implementing an exponential backoff inside a custom sink that uses
>>>> an AvroParquetWriter to write to S3. I've change the number of attempts to
>>>> 0 inside the core-site.xml, and I'm capturing the timeout exception, doing
>>>> a Thread.sleep for X seconds. This is working as intended, and when S3 is
>>>> offline, it waits until it is online.
>>>>
>>>> I also want to test that the back pressure and the checkpoints are
>>>> working as intended, and for the first one, I can see the back pressure in
>>>> Flink UI going up, and recover as expected and not reading more data from
>>>> Kafka.
>>>>
>>>> For the checkpoints, and I've added inside the sink invoke function a
>>>> randomly exception (1 in 100, to simulate that a problem has happen, and
>>>> need to recover from the last good checkpoint), but something strange
>>>> happens. I can see the job is being canceled and created again, and running
>>>> fine, other times after a X number of times of being created and canceled,
>>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>>> forever.
>>>>
>>>> Do you guys have any thoughts?
>>>>
>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>>> exception while processing timer.
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>> at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>> org/joda/time/format/DateTimeParserBucket}
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>> ... 7 more
>>>> Caused by: java.lang.NoClassDefFoundError:
>>>> org/joda/time/format/DateTimeParserBucket
>>>> at
>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>> at
>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>> at
>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>> at
>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>> at
>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>> at
>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>> at
>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>> at
>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>> at
>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>> at
>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>> at
>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>> at
>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>> at
>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>> at
>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>> at
>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>> at
>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>> at
>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>> at
>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>> at
>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>> at
>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>> at scala.util.Try$.apply(Try.scala:209)
>>>> at
>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>> at
>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>> at
>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>> at
>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>> at
>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>> at
>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>> at
>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>> at
>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>> at
>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>> at
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>> ... 7 more
>>>>
>>>>
>>>>
>>>>

Reply via email to