Hi Avi

For application running in your IDE, please set the checkpoint path schema as 
"file://", you could refer to source code of ITcases using rocksDBStateBackend.

For application running in your cluster, please choose Flink with Hadoop to 
download, or choose Flink without hadoop and export your HADOOP_CLASSPATH [1]

[1] https://flink.apache.org/downloads.html#latest-stable-release-v171


Best
Yun Tang
________________________________
From: Avi Levi <avi.l...@bluevoyant.com>
Sent: Thursday, December 20, 2018 2:11
To: Steven Nelson
Cc: Chesnay Schepler; user@flink.apache.org
Subject: Re: getting an error when configuring state backend to hdfs

when I try running from my IDE (intellij) I am getting this exception
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Could not retrieve 
JobResult.
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at 
com.bluevoyant.StreamingJob$.delayedEndpoint$com$bluevoyant$StreamingJob$1(StreamingJob.scala:41)
at com.bluevoyant.StreamingJob$delayedInit$body.apply(StreamingJob.scala:15)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.bluevoyant.StreamingJob$.main(StreamingJob.scala:15)
at com.bluevoyant.StreamingJob.main(StreamingJob.scala)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit job.
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Could not set up 
JobManager
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set 
up JobManager
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176)
at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: java.lang.RuntimeException: Failed to start checkpoint ID counter: 
Could not find a file system implementation for scheme 'hdfs'. The scheme is 
not directly supported by Flink and no Hadoop file system to support this 
scheme could be loaded.
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:255)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at 
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1166)
at 
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1146)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:296)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 'hdfs'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded.
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249)
... 17 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Hadoop is not in the classpath/dependencies.
at 
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
... 23 more


On Wed, Dec 19, 2018 at 6:50 PM Steven Nelson 
<snel...@sourceallies.com<mailto:snel...@sourceallies.com>> wrote:
What image are you using?

Sent from my iPhone

On Dec 19, 2018, at 9:44 AM, Avi Levi 
<avi.l...@bluevoyant.com<mailto:avi.l...@bluevoyant.com>> wrote:

Hi Chesnay,
What do you mean? I am creating a fat jar with all dependencies (using sbt 
assembly). which jar I should place in the /lib directory ?

On Wed, Dec 19, 2018 at 4:44 PM Chesnay Schepler 
<ches...@apache.org<mailto:ches...@apache.org>> wrote:
Are you including the filesystems in your jar? Filesystem jars must be placed 
in the /lib directory of the flink distribution.

On 19.12.2018 15:03, Avi Levi wrote:
Hi,
I am trying to set the backend state to hdfs
val stateUri = "hdfs/path_to_dir"
val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri, true)
env.setStateBackend(backend)

I am running with flink 1.7.0 with the following dependencies (tried them with 
different combinations)  :
"org.apache.flink"    %% "flink-connector-filesystem"         % flinkV
"org.apache.flink"    % "flink-hadoop-fs"                     % flinkV
"org.apache.hadoop"   % "hadoop-hdfs"                         % hadoopVersion
"org.apache.hadoop"   % "hadoop-common"                       % hadoopVersion

however when running the jar I am getting this error:

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 'hdfs'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded.
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249)
... 17 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Hadoop is not in the classpath/dependencies.
at 
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
... 23 more

any help will be greatly appreciated


Reply via email to