Hi Tobias, That certainly explains the mysterious errors you were seeing :)
Thanks for letting us know! Cheers, Max On 14.08.19 14:24, Kaymak, Tobias wrote: > It had nothing to do with Flink or Beam, the dependency was introduced > by a company internal package that was referring to an older version of > RocksDB. > > I've spotted it via running > > mvn dependency:tree > > while investigating with a colleague. Excluding it fixed the issue for us. > > On Tue, Aug 13, 2019 at 3:13 PM Kaymak, Tobias <tobias.kay...@ricardo.ch > <mailto:tobias.kay...@ricardo.ch>> wrote: > > Ok I think I have an understanding of what happens - somehow. > Flink switched their RocksDB fork in the 1.8 release, this is why > the dependency must now be explicitly added to a project. [0] > I did both actually, adding this dependency to my projects pom > (resulting in beam_pipelines.jar) and to the lib directory of the > Flink docker image to execute the pipeline [1]: > > FROM flink:1.8.0-scala_2.11 > ADD --chown=flink:flink > > http://central.maven.org/maven2/org/apache/flink/flink-statebackend-rocksdb_2.11/1.8.0/flink-statebackend-rocksdb_2.11-1.8.0.jar > /opt/flink/lib/flink-statebackend-rocksdb_2.11-1.8.0.jar > ADD --chown=flink:flink target/di-beam-bundled.jar > /opt/flink/lib/beam_pipelines.jar > > Now everything works up the point when I hit the "Stop" button in > the Flink web interface. I think the dependency that the Beam Flink > Runner has is wrong as Flink switched to FRocksDB in 1.8 [2]. I > guess that's why the runner then hits the: > java.lang.NoSuchMethodError: > > org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor; > > But I might also be wrong, I am still investigating. > > Best, > Tobi > > [0] > https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#setting-the-per-job-state-backend > [1] https://hub.docker.com/_/flink > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.8.html#rocksdb-version-bump-and-switch-to-frocksdb-flink-10471 > > On Tue, Aug 13, 2019 at 2:50 PM Kaymak, Tobias > <tobias.kay...@ricardo.ch <mailto:tobias.kay...@ricardo.ch>> wrote: > > This is a major issue for us as we are no longer able to do a > clean-shutdown of the pipelines right now - only cancelling them > hard is possible. > > On Tue, Aug 13, 2019 at 2:46 PM Kaymak, Tobias > <tobias.kay...@ricardo.ch <mailto:tobias.kay...@ricardo.ch>> wrote: > > I just rolled out the upgraded and working 1.8.0/2.14.0 > combination to production and noticed that when I try to > cleanly shutdown a pipeline via the stop button in the > web-interface of Flink 1.8.0 I get exactly the same error: > > java.lang.NoSuchMethodError: > > org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor; > at > > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160) > at > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362) > at > > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.tryDisposeAllOperators(StreamTask.java:454) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:337) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > > > The pipeline then restores from the last snapshot and > continues to run, it does not shut-down as expected. > > Any idea why this could happen? > > On Mon, Aug 12, 2019 at 9:49 PM Kaymak, Tobias > <tobias.kay...@ricardo.ch <mailto:tobias.kay...@ricardo.ch>> > wrote: > > * each time :) > > On Mon, Aug 12, 2019 at 9:48 PM Kaymak, Tobias > <tobias.kay...@ricardo.ch > <mailto:tobias.kay...@ricardo.ch>> wrote: > > I've checked multiple times now and it breaks as > with the 1.8.1 image - I've completely rebuilt the > Docker image and teared down the testing cluster. > > Best, > Tobi > > On Mon, Aug 12, 2019 at 1:45 PM Maximilian Michels > <m...@apache.org <mailto:m...@apache.org>> wrote: > > Hi Tobias! > > I've checked if there were any relevant changes > to the RocksDB state backend in 1.8.1, but I > couldn't spot anything. Could it be that an old > version of RocksDB is still in the Flink cluster > path? > > Cheers, > Max > > On 06.08.19 16:43, Kaymak, Tobias wrote: > > And of course the moment I click "send" I find > that: 😂 > > > > If you use Scala 2.11 and dependency version > 1.8.0 in your Beam projects > > pom.xml it *does* work: > > > > <dependency> > > <groupId>org.apache.flink</groupId> > > > > <artifactId>flink-statebackend-rocksdb_2.11</artifactId> > > <version>1.8.0</version> > > </dependency> > > > > However, if you want to use 1.8.1 - it *does not*. > > > > I still found it confusing, as I am using the > official Flink Docker > > images which are currently at version 1.8.1. > It would have helped me if > > Beam would bundle the statebackend dependency > (as already mentioned Beam > > allows the user to set a state backend via > parameters of the Flink Runner). > > > > On Tue, Aug 6, 2019 at 4:35 PM Kaymak, Tobias > <tobias.kay...@ricardo.ch > <mailto:tobias.kay...@ricardo.ch> > > <mailto:tobias.kay...@ricardo.ch > <mailto:tobias.kay...@ricardo.ch>>> wrote: > > > > Hello, > > > > Flink requires in version 1.8, that if one > wants to use RocksDB as a > > state backend, that dependency has to be > added to the pom.xml file. [0] > > > > My cluster stopped working with RocksDB so > I did added this > > dependency to the pom.xml of my Beam > project (I've tried 1.8.1 and > > 1.8.0): > > > > <dependency> > > > <groupId>org.apache.flink</groupId> > > > > <artifactId>flink-statebackend-rocksdb_2.11</artifactId> > > <version>1.8.0</version> > > </dependency> > > > > I also tried to instead add > > > the flink-statebackend-rocksdb_2.11-1.8.0.jar to > the lib directory > > of the Flink cluster instead (TaskManagers > and JobManager) in all > > cases I get this error: > > > > 2019-08-06 14:14:15,670 ERROR > > > org.apache.flink.streaming.runtime.tasks.StreamTask > - > > Error during disposal of stream operator > > java.lang.NoSuchMethodError: > > > > org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor; > > at > > > > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160) > > at > > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331) > > at > > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362) > > at > > > > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470) > > at > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477) > > at > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378) > > at > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > > at java.lang.Thread.run(Thread.java:748) > > > > This looks like a version mismatch to me, > but I don't know how to > > solve it - could Beam maybe include the > dependency for the RocksDB > > backend for Flink 1.8 or higher, as it > allows to set this value via > > parameters for the Flink Runner? [1] > > > > > > > [0] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html#setting-the-per-job-state-backend > > > [1] > https://beam.apache.org/documentation/runners/flink/#pipeline-options-for-the-flink-runner > > >