+Aljoscha Krettek <aljos...@apache.org> I setup my project using the
template you suggested and I'm able to bucket and write files locally. I
also want to test writing to s3 but I don't know how to configure the `sbt
run` command to tell the FlinkMiniCluster to use the
flink-s3-fs-hadoop-1.4.0.jar and a flink-conf.yaml?

When I try running my jar via the `flink run` command I get the same:
"java.lang.RuntimeException: Error while creating FileSystem when
initializing the state of the BucketingSink" error. How do I overcome this
issues while being able to use the `flink run` command, so I'm able to the
flink-conf.yaml and flink-s3-fs-hadoop-1.4.0.jar?


On Fri, Jan 5, 2018 at 7:50 PM Kyle Hamlin <hamlin...@gmail.com> wrote:

> Also, I'm not using hdfs I'm trying to sink to s3.
>
> On Fri, Jan 5, 2018 at 6:18 PM Kyle Hamlin <hamlin...@gmail.com> wrote:
>
>> I have the hadoop-common.jar in my build.sbt because I was having issues
>> compiling my jar after moving from 1.3.2 to 1.4.0 because
>> org.apache.hadoop.fs.{FileSystem, Path} were no longer in Flink and I use
>> them in my custom bucketer and to writer to write Avro out to Parquet.
>>
>> I tried adding classloader.resolve-order: parent-first to my
>> flink-conf.yaml but that didn't seem to work. I greped my jar for "hadoop"
>> and found the following:
>>
>> org/apache/hadoop/*
>> org/apache/parquet/hadoop/*
>>
>> after designating hadoop-common.jar dependency as "provided" only 
>> org/apache/parquet/hadoop/*
>> files show up. Additionally, the "RpcEngine" and "ProtobufRpcEngine" error
>> doesn't show up anymore just the following:
>>
>> java.lang.RuntimeException: Error while creating FileSystem when
>> initializing the state of the BucketingSink.
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: Cannot instantiate file system for URI:
>> hdfs://localhost:12345/
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>> at
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
>> ... 9 more
>>
>> Moving the hadoop-common.jar to flinks lib/ directory also doesn't appear
>> to help.
>>
>>
>> On Thu, Jan 4, 2018 at 10:48 AM Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> I think this might be happening because partial Hadoop dependencies are
>>> in the user jar and the rest is only available from the Hadoop deps that
>>> come bundled with Flink. For example, I noticed that you have Hadoop-common
>>> as a dependency which probably ends up in your Jar.
>>>
>>>
>>> On 4. Jan 2018, at 11:40, Stephan Ewen <se...@apache.org> wrote:
>>>
>>> Hi!
>>>
>>> This looks indeed like a class-loading issue - it looks like "RpcEngine"
>>> and "ProtobufRpcEngine" are loaded via different classloaders.
>>>
>>> Can you try the following:
>>>
>>>   - In your flink-conf.yml, set classloader.resolve-order: parent-first
>>>
>>> If that fixes the issue, then we can look at a way to make this
>>> seamless...
>>>
>>> On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <hamlin...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> After moving to Flink 1.4.0 I'm getting the following error. I can't
>>>> find anything online that addresses it. Is it a Hadoop dependency issue?
>>>> Here are my project dependencies:
>>>>
>>>> libraryDependencies ++= Seq(
>>>>   "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
>>>>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>>>>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>>>>   "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
>>>>   "org.apache.flink" % "flink-metrics-core" % flinkVersion,
>>>>   "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
>>>>   "org.apache.kafka" %% "kafka" % "0.10.0.1",
>>>>   "org.apache.avro" % "avro" % "1.7.7",
>>>>   "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
>>>>   "org.apache.parquet" % "parquet-avro" % "1.8.1",
>>>>   "io.confluent" % "kafka-avro-serializer" % "3.2.0",
>>>>   "org.apache.hadoop" % "hadoop-common" % "3.0.0"
>>>> )
>>>>
>>>> *Stacktrace:*
>>>> Cluster configuration: Standalone cluster with JobManager at localhost/
>>>> 127.0.0.1:6123
>>>> Using address localhost:6123 to connect to JobManager.
>>>> JobManager web interface address http://localhost:8082
>>>> Starting execution of program
>>>> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting
>>>> for job completion.
>>>> Connected to JobManager at 
>>>> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259]
>>>> with leader session id 00000000-0000-0000-0000-000000000000.
>>>> 01/03/2018 14:20:52 Job execution switched to status RUNNING.
>>>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to
>>>> SCHEDULED
>>>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to
>>>> DEPLOYING
>>>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
>>>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
>>>> java.lang.RuntimeException: Error while creating FileSystem when
>>>> initializing the state of the BucketingSink.
>>>> at
>>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
>>>> at
>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>>> at
>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.io.IOException: Cannot instantiate file system for URI:
>>>> hdfs://localhost:12345/
>>>> at
>>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>>>> at
>>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>>>> at
>>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
>>>> at
>>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
>>>> at
>>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
>>>> ... 9 more
>>>> Caused by: java.lang.ClassCastException:
>>>> org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to
>>>> org.apache.hadoop.ipc.RpcEngine
>>>> at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
>>>> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
>>>> at
>>>> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
>>>> at
>>>> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
>>>> at
>>>> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>>>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>>>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>>>> at
>>>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>>>> at
>>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>>>> ... 13 more
>>>>
>>>
>>>
>>>

Reply via email to