Which dependencies do I need to put in the lib directory? I have the flink-shaded-hadoop2.jar and parquet-hadoop.jar marked as "provided" and both are in the lib/ directory and I still get the error? I've also set classloader.resolve-order: parent-first in my flink-conf.yaml.
val flinkDependencies = Seq( "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion, "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion, "org.apache.flink" % "flink-metrics-core" % flinkVersion, "org.apache.flink" % "flink-metrics-graphite" % flinkVersion, "org.apache.flink" % "flink-avro" % flinkVersion, "org.apache.kafka" %% "kafka" % "0.10.0.1", "org.apache.avro" % "avro" % "1.7.7", "org.apache.parquet" % "parquet-hadoop" % "1.8.1"% "provided", "org.apache.parquet" % "parquet-avro" % "1.8.1", "io.confluent" % "kafka-avro-serializer" % "3.2.0", "org.apache.flink" % "flink-shaded-hadoop2" % flinkVersion % "provided" ) On Tue, Jan 9, 2018 at 5:53 PM Stephan Ewen <se...@apache.org> wrote: > Hi! > > For the exception in the "local file path" case, we already had a reason: > > - You were having Hadoop code in your user code jar. Either not putting > the Hadoop dependency into your jar (rather in Flink's lib directory) or > setting classloading to "parent-first" should do the trick there. > > For the other exception, we still have not managed to reproduce it, > unfortunately. > > Best, > Stephan > > > On Mon, Jan 8, 2018 at 10:23 PM, Kyle Hamlin <hamlin...@gmail.com> wrote: > >> When I change the path from an s3 path to a local path I get the >> following error: >> >> Cluster configuration: Standalone cluster with JobManager at localhost/ >> 127.0.0.1:6123 >> Using address localhost:6123 to connect to JobManager. >> JobManager web interface address http://localhost:8082 >> Starting execution of program >> Submitting job with JobID: 5403f2bf4a71abf745bd1ed93c8feb25. Waiting for >> job completion. >> Connected to JobManager at >> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] >> with leader session id 00000000-0000-0000-0000-000000000000. >> 01/08/2018 16:14:59 Job execution switched to status RUNNING. >> 01/08/2018 16:14:59 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED >> 01/08/2018 16:14:59 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING >> 01/08/2018 16:14:59 Source: Kafka -> Sink: S3(1/1) switched to RUNNING >> 01/08/2018 16:14:59 Source: Kafka -> Sink: S3(1/1) switched to FAILED >> java.lang.RuntimeException: Error while creating FileSystem when >> initializing the state of the BucketingSink. >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.io.IOException: Cannot instantiate file system for URI: >> hdfs://localhost:12345/ >> at >> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187) >> at >> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355) >> ... 9 more >> Caused by: java.lang.ClassCastException >> >> On Mon, Jan 8, 2018 at 9:21 PM Kyle Hamlin <hamlin...@gmail.com> wrote: >> >>> Here is the task manager log: >>> >>> 2018-01-08 16:16:13,406 INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - Received task Source: >>> Kafka -> Sink: S3 (1/1) >>> 2018-01-08 16:16:13,407 INFO org.apache.flink.runtime.taskmanager.Task - >>> Source: Kafka -> Sink: S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) switched >>> from CREATED to DEPLOYING. >>> 2018-01-08 16:16:13,407 INFO org.apache.flink.runtime.taskmanager.Task - >>> Creating FileSystem stream leak safety net for task Source: Kafka -> Sink: >>> S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) [DEPLOYING] >>> 2018-01-08 16:16:13,407 INFO org.apache.flink.runtime.taskmanager.Task - >>> Loading JAR files for task Source: Kafka -> Sink: S3 (1/1) >>> (bc932736c6526eb1bd41f6aaa73b2997) [DEPLOYING]. >>> 2018-01-08 16:16:13,407 INFO org.apache.flink.runtime.blob.BlobClient - >>> Downloading >>> b4a2b46051079212bdab65e50ee5ab03/p-bfab322ceec8791627d2191ca955ba74f4ec4dc3-d2d320b15315ce3b67038f336209bb2c >>> from localhost/127.0.0.1:49710 >>> 2018-01-08 16:16:13,595 INFO org.apache.flink.runtime.taskmanager.Task - >>> Registering task at network: Source: Kafka -> Sink: S3 (1/1) >>> (bc932736c6526eb1bd41f6aaa73b2997) [DEPLOYING]. >>> 2018-01-08 16:16:13,595 INFO org.apache.flink.runtime.taskmanager.Task - >>> Source: Kafka -> Sink: S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) switched >>> from DEPLOYING to RUNNING. >>> 2018-01-08 16:16:13,596 INFO >>> org.apache.flink.streaming.runtime.tasks.StreamTask - Using user-defined >>> state backend: File State Backend @ s3://my-bucket/checkpoints. >>> 2018-01-08 16:16:13,631 INFO org.apache.flink.runtime.taskmanager.Task - >>> Source: Kafka -> Sink: S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) switched >>> from RUNNING to FAILED. >>> >>> java.lang.NoClassDefFoundError: Could not initialize class >>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler >>> at >>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:363) >>> at >>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:542) >>> at >>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639) >>> at >>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212) >>> at >>> org.apache.flink.fs.s3presto.S3FileSystemFactory.create(S3FileSystemFactory.java:132) >>> at >>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397) >>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) >>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293) >>> at >>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99) >>> at >>> org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>> at java.lang.Thread.run(Thread.java:748) >>> 2018-01-08 16:16:13,631 INFO org.apache.flink.runtime.taskmanager.Task - >>> Freeing task resources for Source: Kafka -> Sink: S3 (1/1) >>> (bc932736c6526eb1bd41f6aaa73b2997). >>> 2018-01-08 16:16:13,631 INFO org.apache.flink.runtime.taskmanager.Task - >>> Ensuring all FileSystem streams are closed for task Source: Kafka -> Sink: >>> S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) [FAILED] >>> 2018-01-08 16:16:13,631 INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and >>> sending final execution state FAILED to JobManager for task Source: Kafka >>> -> Sink: S3 (bc932736c6526eb1bd41f6aaa73b2997) >>> 2018-01-08 16:16:13,635 INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - Discarding the results >>> produced by task execution bc932736c6526eb1bd41f6aaa73b2997 >>> 2018-01-08 16:16:23,641 INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - Received task Source: >>> Kafka -> Sink: S3 (1/1) >>> 2018-01-08 16:16:23,641 INFO org.apache.flink.runtime.taskmanager.Task - >>> Source: Kafka -> Sink: S3 (1/1) (235933729cf5dffdcba6904188c2ec8d) switched >>> from CREATED to DEPLOYING. >>> 2018-01-08 16:16:23,641 INFO org.apache.flink.runtime.taskmanager.Task - >>> Creating FileSystem stream leak safety net for task Source: Kafka -> Sink: >>> S3 (1/1) (235933729cf5dffdcba6904188c2ec8d) [DEPLOYING] >>> 2018-01-08 16:16:23,641 INFO org.apache.flink.runtime.taskmanager.Task - >>> Loading JAR files for task Source: Kafka -> Sink: S3 (1/1) >>> (235933729cf5dffdcba6904188c2ec8d) [DEPLOYING]. >>> 2018-01-08 16:16:23,643 INFO org.apache.flink.runtime.taskmanager.Task - >>> Registering task at network: Source: Kafka -> Sink: S3 (1/1) >>> (235933729cf5dffdcba6904188c2ec8d) [DEPLOYING]. >>> 2018-01-08 16:16:23,643 INFO org.apache.flink.runtime.taskmanager.Task - >>> Source: Kafka -> Sink: S3 (1/1) (235933729cf5dffdcba6904188c2ec8d) switched >>> from DEPLOYING to RUNNING. >>> 2018-01-08 16:16:23,643 INFO >>> org.apache.flink.streaming.runtime.tasks.StreamTask - Using user-defined >>> state backend: File State Backend @ s3://my-bucket/checkpoints. >>> 2018-01-08 16:16:23,675 INFO org.apache.flink.runtime.taskmanager.Task - >>> Source: Kafka -> Sink: S3 (1/1) (235933729cf5dffdcba6904188c2ec8d) switched >>> from RUNNING to FAILED. >>> >>> On Thu, Jan 4, 2018 at 8:40 PM Stephan Ewen <se...@apache.org> wrote: >>> >>>> This looks like the output from the client - do you have some >>>> TaskManager log files with more log entries? >>>> >>>> That would be helpful... >>>> >>>> >>>> On Wed, Jan 3, 2018 at 5:26 PM, Kyle Hamlin <hamlin...@gmail.com> >>>> wrote: >>>> >>>>> Hello Stephan & Nico, >>>>> >>>>> Here is the full stacktrace, its not much more than what I originally >>>>> posted. I remember seeing an XMLInputFactory input error at one >>>>> point, but I haven't seen that again. Is there any other information I can >>>>> provide that will help resolve? >>>>> >>>>> [flink-1.4.0] ./bin/flink run ~/streaming.jar >>>>> Cluster configuration: Standalone cluster with JobManager at localhost/ >>>>> 127.0.0.1:6123 >>>>> Using address localhost:6123 to connect to JobManager. >>>>> JobManager web interface address http://localhost:8082 >>>>> Starting execution of program >>>>> Submitting job with JobID: 022e4a310cd56ba4f7befc0286921663. Waiting >>>>> for job completion. >>>>> Connected to JobManager at >>>>> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] >>>>> with leader session id 00000000-0000-0000-0000-000000000000. >>>>> 01/03/2018 11:21:53 Job execution switched to status RUNNING. >>>>> 01/03/2018 11:21:53 Source: Kafka -> Sink: Unnamed(1/1) switched to >>>>> SCHEDULED >>>>> 01/03/2018 11:21:53 Source: Kafka -> Sink: Unnamed(1/1) switched to >>>>> DEPLOYING >>>>> 01/03/2018 11:21:53 Source: Kafka -> Sink: Unnamed(1/1) switched to >>>>> RUNNING >>>>> 01/03/2018 11:21:54 Source: Kafka -> Sink: Unnamed(1/1) switched to >>>>> FAILED >>>>> java.lang.NoClassDefFoundError: Could not initialize class >>>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler >>>>> at >>>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:363) >>>>> at >>>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:542) >>>>> at >>>>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639) >>>>> at >>>>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212) >>>>> at >>>>> org.apache.flink.fs.s3presto.S3FileSystemFactory.create(S3FileSystemFactory.java:132) >>>>> at >>>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397) >>>>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) >>>>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293) >>>>> at >>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99) >>>>> at >>>>> org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> >>>>> 01/03/2018 11:21:54 Job execution switched to status FAILING. >>>>> java.lang.NoClassDefFoundError: Could not initialize class >>>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler >>>>> at >>>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:363) >>>>> at >>>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:542) >>>>> at >>>>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639) >>>>> at >>>>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212) >>>>> at >>>>> org.apache.flink.fs.s3presto.S3FileSystemFactory.create(S3FileSystemFactory.java:132) >>>>> at >>>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397) >>>>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) >>>>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293) >>>>> at >>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99) >>>>> at >>>>> org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> 01/03/2018 11:21:54 Job execution switched to status RESTARTING. >>>>> >>>>> On Wed, Jan 3, 2018 at 11:33 AM Stephan Ewen <se...@apache.org> wrote: >>>>> >>>>>> The error is not a the missing Class "S3ErrorResponseHandler", but >>>>>> the initialization of that class. >>>>>> >>>>>> It could be missing classes that are statically referenced by the >>>>>> "S3ErrorResponseHandler". >>>>>> The ones I see are "XMLInputFactory" (part of Java itself, should >>>>>> always be there) and "org.apache.commons.logging.Log" which should also >>>>>> be >>>>>> there. >>>>>> There may be other classes loaded during initialization. Knowing more >>>>>> about the exception would help, as we have not seen that error, yet. >>>>>> >>>>>> >>>>>> On Wed, Jan 3, 2018 at 11:39 AM, Nico Kruber <n...@data-artisans.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Kyle, >>>>>>> except for putting the jar into the lib/ folder and setting up >>>>>>> credentials, nothing else should be required [1]. >>>>>>> >>>>>>> The S3ErrorResponseHandler class itself is in the jar, as you can >>>>>>> see with >>>>>>> jar tf flink-s3-fs-presto-1.4.0.jar | grep >>>>>>> >>>>>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler >>>>>>> >>>>>>> Therefore, the cause for this exception would be interesting (as >>>>>>> Stephan >>>>>>> suggested). >>>>>>> >>>>>>> >>>>>>> Nico >>>>>>> >>>>>>> [1] >>>>>>> >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended >>>>>>> >>>>>>> On 03/01/18 10:22, Stephan Ewen wrote: >>>>>>> > Hi Kyle! >>>>>>> > >>>>>>> > Is there more of the stack trace available, like an original >>>>>>> exception >>>>>>> > cause? >>>>>>> > >>>>>>> > Best, >>>>>>> > Stephan >>>>>>> > >>>>>>> > >>>>>>> > On Sun, Dec 31, 2017 at 5:10 PM, Kyle Hamlin <hamlin...@gmail.com >>>>>>> > <mailto:hamlin...@gmail.com>> wrote: >>>>>>> > >>>>>>> > Hi, >>>>>>> > >>>>>>> > When testing Flink 1.4 locally the error below keeps getting >>>>>>> thrown. >>>>>>> > I've followed the setup by moving the flink-s3-fs-presto.jar >>>>>>> from >>>>>>> > the opt/ folder to the lib/ folder. Is there something >>>>>>> additional I >>>>>>> > need to do? >>>>>>> > >>>>>>> > java.lang.NoClassDefFoundError: Could not initialize class >>>>>>> > >>>>>>> >>>>>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler >>>>>>> > at >>>>>>> > >>>>>>> >>>>>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:363) >>>>>>> > at >>>>>>> > >>>>>>> >>>>>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:542) >>>>>>> > at >>>>>>> > >>>>>>> >>>>>>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639) >>>>>>> > at >>>>>>> > >>>>>>> >>>>>>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212) >>>>>>> > at >>>>>>> > >>>>>>> >>>>>>> org.apache.flink.fs.s3presto.S3FileSystemFactory.create(S3FileSystemFactory.java:132) >>>>>>> > at >>>>>>> > >>>>>>> >>>>>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397) >>>>>>> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) >>>>>>> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293) >>>>>>> > at >>>>>>> > >>>>>>> >>>>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99) >>>>>>> > at >>>>>>> > >>>>>>> >>>>>>> org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277) >>>>>>> > at >>>>>>> > >>>>>>> >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787) >>>>>>> > at >>>>>>> > >>>>>>> >>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) >>>>>>> > at >>>>>>> > >>>>>>> >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) >>>>>>> > at >>>>>>> > >>>>>>> >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) >>>>>>> > at >>>>>>> > >>>>>>> >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) >>>>>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>>>>>> > >>>>>>> > >>>>>>> >>>>>>> >>>>>> >>>> >