I'm implementing an exponential backoff inside a custom sink that uses an
AvroParquetWriter to write to S3. I've change the number of attempts to 0
inside the core-site.xml, and I'm capturing the timeout exception, doing a
Thread.sleep for X seconds. This is working as intended, and when S3 is
offline, it waits until it is online.

I also want to test that the back pressure and the checkpoints are working
as intended, and for the first one, I can see the back pressure in Flink UI
going up, and recover as expected and not reading more data from Kafka.

For the checkpoints, and I've added inside the sink invoke function a
randomly exception (1 in 100, to simulate that a problem has happen, and
need to recover from the last good checkpoint), but something strange
happens. I can see the job is being canceled and created again, and running
fine, other times after a X number of times of being created and canceled,
it gives a *NoClassDefFoundError*, and it will keep giving that forever.

Do you guys have any thoughts?

org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: TimerException{java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeParserBucket}
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
... 7 more
Caused by: java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeParserBucket
at
org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
at
com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
at
com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
at
com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
at
com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
at
com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
at
com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
at
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
at
org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
at
org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
at
org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
at
com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.util.Try$.apply(Try.scala:209)
at
com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
at
com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
at
com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
... 7 more

Reply via email to