+Aljoscha Krettek <aljos...@apache.org> I setup my project using the template you suggested and I'm able to bucket and write files locally. I also want to test writing to s3 but I don't know how to configure the `sbt run` command to tell the FlinkMiniCluster to use the flink-s3-fs-hadoop-1.4.0.jar and a flink-conf.yaml?
When I try running my jar via the `flink run` command I get the same: "java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink" error. How do I overcome this issues while being able to use the `flink run` command, so I'm able to the flink-conf.yaml and flink-s3-fs-hadoop-1.4.0.jar? On Fri, Jan 5, 2018 at 7:50 PM Kyle Hamlin <hamlin...@gmail.com> wrote: > Also, I'm not using hdfs I'm trying to sink to s3. > > On Fri, Jan 5, 2018 at 6:18 PM Kyle Hamlin <hamlin...@gmail.com> wrote: > >> I have the hadoop-common.jar in my build.sbt because I was having issues >> compiling my jar after moving from 1.3.2 to 1.4.0 because >> org.apache.hadoop.fs.{FileSystem, Path} were no longer in Flink and I use >> them in my custom bucketer and to writer to write Avro out to Parquet. >> >> I tried adding classloader.resolve-order: parent-first to my >> flink-conf.yaml but that didn't seem to work. I greped my jar for "hadoop" >> and found the following: >> >> org/apache/hadoop/* >> org/apache/parquet/hadoop/* >> >> after designating hadoop-common.jar dependency as "provided" only >> org/apache/parquet/hadoop/* >> files show up. Additionally, the "RpcEngine" and "ProtobufRpcEngine" error >> doesn't show up anymore just the following: >> >> java.lang.RuntimeException: Error while creating FileSystem when >> initializing the state of the BucketingSink. >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.io.IOException: Cannot instantiate file system for URI: >> hdfs://localhost:12345/ >> at >> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187) >> at >> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355) >> ... 9 more >> >> Moving the hadoop-common.jar to flinks lib/ directory also doesn't appear >> to help. >> >> >> On Thu, Jan 4, 2018 at 10:48 AM Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> I think this might be happening because partial Hadoop dependencies are >>> in the user jar and the rest is only available from the Hadoop deps that >>> come bundled with Flink. For example, I noticed that you have Hadoop-common >>> as a dependency which probably ends up in your Jar. >>> >>> >>> On 4. Jan 2018, at 11:40, Stephan Ewen <se...@apache.org> wrote: >>> >>> Hi! >>> >>> This looks indeed like a class-loading issue - it looks like "RpcEngine" >>> and "ProtobufRpcEngine" are loaded via different classloaders. >>> >>> Can you try the following: >>> >>> - In your flink-conf.yml, set classloader.resolve-order: parent-first >>> >>> If that fixes the issue, then we can look at a way to make this >>> seamless... >>> >>> On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <hamlin...@gmail.com> wrote: >>> >>>> Hello, >>>> >>>> After moving to Flink 1.4.0 I'm getting the following error. I can't >>>> find anything online that addresses it. Is it a Hadoop dependency issue? >>>> Here are my project dependencies: >>>> >>>> libraryDependencies ++= Seq( >>>> "org.apache.flink" %% "flink-scala" % flinkVersion % Provided, >>>> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided, >>>> "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion, >>>> "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion, >>>> "org.apache.flink" % "flink-metrics-core" % flinkVersion, >>>> "org.apache.flink" % "flink-metrics-graphite" % flinkVersion, >>>> "org.apache.kafka" %% "kafka" % "0.10.0.1", >>>> "org.apache.avro" % "avro" % "1.7.7", >>>> "org.apache.parquet" % "parquet-hadoop" % "1.8.1", >>>> "org.apache.parquet" % "parquet-avro" % "1.8.1", >>>> "io.confluent" % "kafka-avro-serializer" % "3.2.0", >>>> "org.apache.hadoop" % "hadoop-common" % "3.0.0" >>>> ) >>>> >>>> *Stacktrace:* >>>> Cluster configuration: Standalone cluster with JobManager at localhost/ >>>> 127.0.0.1:6123 >>>> Using address localhost:6123 to connect to JobManager. >>>> JobManager web interface address http://localhost:8082 >>>> Starting execution of program >>>> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting >>>> for job completion. >>>> Connected to JobManager at >>>> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] >>>> with leader session id 00000000-0000-0000-0000-000000000000. >>>> 01/03/2018 14:20:52 Job execution switched to status RUNNING. >>>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to >>>> SCHEDULED >>>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to >>>> DEPLOYING >>>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING >>>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED >>>> java.lang.RuntimeException: Error while creating FileSystem when >>>> initializing the state of the BucketingSink. >>>> at >>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358) >>>> at >>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) >>>> at >>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: java.io.IOException: Cannot instantiate file system for URI: >>>> hdfs://localhost:12345/ >>>> at >>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187) >>>> at >>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401) >>>> at >>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154) >>>> at >>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411) >>>> at >>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355) >>>> ... 9 more >>>> Caused by: java.lang.ClassCastException: >>>> org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to >>>> org.apache.hadoop.ipc.RpcEngine >>>> at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207) >>>> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579) >>>> at >>>> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418) >>>> at >>>> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314) >>>> at >>>> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176) >>>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678) >>>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619) >>>> at >>>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149) >>>> at >>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159) >>>> ... 13 more >>>> >>> >>> >>>