When I change the path from an s3 path to a local path I get the following error:
Cluster configuration: Standalone cluster with JobManager at localhost/ 127.0.0.1:6123 Using address localhost:6123 to connect to JobManager. JobManager web interface address http://localhost:8082 Starting execution of program Submitting job with JobID: 5403f2bf4a71abf745bd1ed93c8feb25. Waiting for job completion. Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader session id 00000000-0000-0000-0000-000000000000. 01/08/2018 16:14:59 Job execution switched to status RUNNING. 01/08/2018 16:14:59 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED 01/08/2018 16:14:59 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING 01/08/2018 16:14:59 Source: Kafka -> Sink: S3(1/1) switched to RUNNING 01/08/2018 16:14:59 Source: Kafka -> Sink: S3(1/1) switched to FAILED java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink. at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://localhost:12345/ at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355) ... 9 more Caused by: java.lang.ClassCastException On Mon, Jan 8, 2018 at 9:21 PM Kyle Hamlin <hamlin...@gmail.com> wrote: > Here is the task manager log: > > 2018-01-08 16:16:13,406 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Received task Source: > Kafka -> Sink: S3 (1/1) > 2018-01-08 16:16:13,407 INFO org.apache.flink.runtime.taskmanager.Task - > Source: Kafka -> Sink: S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) switched > from CREATED to DEPLOYING. > 2018-01-08 16:16:13,407 INFO org.apache.flink.runtime.taskmanager.Task - > Creating FileSystem stream leak safety net for task Source: Kafka -> Sink: > S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) [DEPLOYING] > 2018-01-08 16:16:13,407 INFO org.apache.flink.runtime.taskmanager.Task - > Loading JAR files for task Source: Kafka -> Sink: S3 (1/1) > (bc932736c6526eb1bd41f6aaa73b2997) [DEPLOYING]. > 2018-01-08 16:16:13,407 INFO org.apache.flink.runtime.blob.BlobClient - > Downloading > b4a2b46051079212bdab65e50ee5ab03/p-bfab322ceec8791627d2191ca955ba74f4ec4dc3-d2d320b15315ce3b67038f336209bb2c > from localhost/127.0.0.1:49710 > 2018-01-08 16:16:13,595 INFO org.apache.flink.runtime.taskmanager.Task - > Registering task at network: Source: Kafka -> Sink: S3 (1/1) > (bc932736c6526eb1bd41f6aaa73b2997) [DEPLOYING]. > 2018-01-08 16:16:13,595 INFO org.apache.flink.runtime.taskmanager.Task - > Source: Kafka -> Sink: S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) switched > from DEPLOYING to RUNNING. > 2018-01-08 16:16:13,596 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Using user-defined > state backend: File State Backend @ s3://my-bucket/checkpoints. > 2018-01-08 16:16:13,631 INFO org.apache.flink.runtime.taskmanager.Task - > Source: Kafka -> Sink: S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) switched > from RUNNING to FAILED. > > java.lang.NoClassDefFoundError: Could not initialize class > org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler > at > org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:363) > at > org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:542) > at > org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639) > at > org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212) > at > org.apache.flink.fs.s3presto.S3FileSystemFactory.create(S3FileSystemFactory.java:132) > at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397) > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99) > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > 2018-01-08 16:16:13,631 INFO org.apache.flink.runtime.taskmanager.Task - > Freeing task resources for Source: Kafka -> Sink: S3 (1/1) > (bc932736c6526eb1bd41f6aaa73b2997). > 2018-01-08 16:16:13,631 INFO org.apache.flink.runtime.taskmanager.Task - > Ensuring all FileSystem streams are closed for task Source: Kafka -> Sink: > S3 (1/1) (bc932736c6526eb1bd41f6aaa73b2997) [FAILED] > 2018-01-08 16:16:13,631 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and > sending final execution state FAILED to JobManager for task Source: Kafka > -> Sink: S3 (bc932736c6526eb1bd41f6aaa73b2997) > 2018-01-08 16:16:13,635 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Discarding the results > produced by task execution bc932736c6526eb1bd41f6aaa73b2997 > 2018-01-08 16:16:23,641 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Received task Source: > Kafka -> Sink: S3 (1/1) > 2018-01-08 16:16:23,641 INFO org.apache.flink.runtime.taskmanager.Task - > Source: Kafka -> Sink: S3 (1/1) (235933729cf5dffdcba6904188c2ec8d) switched > from CREATED to DEPLOYING. > 2018-01-08 16:16:23,641 INFO org.apache.flink.runtime.taskmanager.Task - > Creating FileSystem stream leak safety net for task Source: Kafka -> Sink: > S3 (1/1) (235933729cf5dffdcba6904188c2ec8d) [DEPLOYING] > 2018-01-08 16:16:23,641 INFO org.apache.flink.runtime.taskmanager.Task - > Loading JAR files for task Source: Kafka -> Sink: S3 (1/1) > (235933729cf5dffdcba6904188c2ec8d) [DEPLOYING]. > 2018-01-08 16:16:23,643 INFO org.apache.flink.runtime.taskmanager.Task - > Registering task at network: Source: Kafka -> Sink: S3 (1/1) > (235933729cf5dffdcba6904188c2ec8d) [DEPLOYING]. > 2018-01-08 16:16:23,643 INFO org.apache.flink.runtime.taskmanager.Task - > Source: Kafka -> Sink: S3 (1/1) (235933729cf5dffdcba6904188c2ec8d) switched > from DEPLOYING to RUNNING. > 2018-01-08 16:16:23,643 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Using user-defined > state backend: File State Backend @ s3://my-bucket/checkpoints. > 2018-01-08 16:16:23,675 INFO org.apache.flink.runtime.taskmanager.Task - > Source: Kafka -> Sink: S3 (1/1) (235933729cf5dffdcba6904188c2ec8d) switched > from RUNNING to FAILED. > > On Thu, Jan 4, 2018 at 8:40 PM Stephan Ewen <se...@apache.org> wrote: > >> This looks like the output from the client - do you have some TaskManager >> log files with more log entries? >> >> That would be helpful... >> >> >> On Wed, Jan 3, 2018 at 5:26 PM, Kyle Hamlin <hamlin...@gmail.com> wrote: >> >>> Hello Stephan & Nico, >>> >>> Here is the full stacktrace, its not much more than what I originally >>> posted. I remember seeing an XMLInputFactory input error at one point, >>> but I haven't seen that again. Is there any other information I can provide >>> that will help resolve? >>> >>> [flink-1.4.0] ./bin/flink run ~/streaming.jar >>> Cluster configuration: Standalone cluster with JobManager at localhost/ >>> 127.0.0.1:6123 >>> Using address localhost:6123 to connect to JobManager. >>> JobManager web interface address http://localhost:8082 >>> Starting execution of program >>> Submitting job with JobID: 022e4a310cd56ba4f7befc0286921663. Waiting for >>> job completion. >>> Connected to JobManager at >>> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] >>> with leader session id 00000000-0000-0000-0000-000000000000. >>> 01/03/2018 11:21:53 Job execution switched to status RUNNING. >>> 01/03/2018 11:21:53 Source: Kafka -> Sink: Unnamed(1/1) switched to >>> SCHEDULED >>> 01/03/2018 11:21:53 Source: Kafka -> Sink: Unnamed(1/1) switched to >>> DEPLOYING >>> 01/03/2018 11:21:53 Source: Kafka -> Sink: Unnamed(1/1) switched to >>> RUNNING >>> 01/03/2018 11:21:54 Source: Kafka -> Sink: Unnamed(1/1) switched to >>> FAILED >>> java.lang.NoClassDefFoundError: Could not initialize class >>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler >>> at >>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:363) >>> at >>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:542) >>> at >>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639) >>> at >>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212) >>> at >>> org.apache.flink.fs.s3presto.S3FileSystemFactory.create(S3FileSystemFactory.java:132) >>> at >>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397) >>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) >>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293) >>> at >>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99) >>> at >>> org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> 01/03/2018 11:21:54 Job execution switched to status FAILING. >>> java.lang.NoClassDefFoundError: Could not initialize class >>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler >>> at >>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:363) >>> at >>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:542) >>> at >>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639) >>> at >>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212) >>> at >>> org.apache.flink.fs.s3presto.S3FileSystemFactory.create(S3FileSystemFactory.java:132) >>> at >>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397) >>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) >>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293) >>> at >>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99) >>> at >>> org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>> at java.lang.Thread.run(Thread.java:748) >>> 01/03/2018 11:21:54 Job execution switched to status RESTARTING. >>> >>> On Wed, Jan 3, 2018 at 11:33 AM Stephan Ewen <se...@apache.org> wrote: >>> >>>> The error is not a the missing Class "S3ErrorResponseHandler", but the >>>> initialization of that class. >>>> >>>> It could be missing classes that are statically referenced by the >>>> "S3ErrorResponseHandler". >>>> The ones I see are "XMLInputFactory" (part of Java itself, should >>>> always be there) and "org.apache.commons.logging.Log" which should also be >>>> there. >>>> There may be other classes loaded during initialization. Knowing more >>>> about the exception would help, as we have not seen that error, yet. >>>> >>>> >>>> On Wed, Jan 3, 2018 at 11:39 AM, Nico Kruber <n...@data-artisans.com> >>>> wrote: >>>> >>>>> Hi Kyle, >>>>> except for putting the jar into the lib/ folder and setting up >>>>> credentials, nothing else should be required [1]. >>>>> >>>>> The S3ErrorResponseHandler class itself is in the jar, as you can see >>>>> with >>>>> jar tf flink-s3-fs-presto-1.4.0.jar | grep >>>>> >>>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler >>>>> >>>>> Therefore, the cause for this exception would be interesting (as >>>>> Stephan >>>>> suggested). >>>>> >>>>> >>>>> Nico >>>>> >>>>> [1] >>>>> >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended >>>>> >>>>> On 03/01/18 10:22, Stephan Ewen wrote: >>>>> > Hi Kyle! >>>>> > >>>>> > Is there more of the stack trace available, like an original >>>>> exception >>>>> > cause? >>>>> > >>>>> > Best, >>>>> > Stephan >>>>> > >>>>> > >>>>> > On Sun, Dec 31, 2017 at 5:10 PM, Kyle Hamlin <hamlin...@gmail.com >>>>> > <mailto:hamlin...@gmail.com>> wrote: >>>>> > >>>>> > Hi, >>>>> > >>>>> > When testing Flink 1.4 locally the error below keeps getting >>>>> thrown. >>>>> > I've followed the setup by moving the flink-s3-fs-presto.jar from >>>>> > the opt/ folder to the lib/ folder. Is there something >>>>> additional I >>>>> > need to do? >>>>> > >>>>> > java.lang.NoClassDefFoundError: Could not initialize class >>>>> > >>>>> >>>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler >>>>> > at >>>>> > >>>>> >>>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:363) >>>>> > at >>>>> > >>>>> >>>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:542) >>>>> > at >>>>> > >>>>> >>>>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639) >>>>> > at >>>>> > >>>>> >>>>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212) >>>>> > at >>>>> > >>>>> >>>>> org.apache.flink.fs.s3presto.S3FileSystemFactory.create(S3FileSystemFactory.java:132) >>>>> > at >>>>> > >>>>> >>>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397) >>>>> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) >>>>> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293) >>>>> > at >>>>> > >>>>> >>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99) >>>>> > at >>>>> > >>>>> >>>>> org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277) >>>>> > at >>>>> > >>>>> >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787) >>>>> > at >>>>> > >>>>> >>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) >>>>> > at >>>>> > >>>>> >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) >>>>> > at >>>>> > >>>>> >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) >>>>> > at >>>>> > >>>>> >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) >>>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>>>> > >>>>> > >>>>> >>>>> >>>> >>