Hi Tobias,

That certainly explains the mysterious errors you were seeing :)

Thanks for letting us know!

Cheers,
Max

On 14.08.19 14:24, Kaymak, Tobias wrote:
> It had nothing to do with Flink or Beam, the dependency was introduced
> by a company internal package that was referring to an older version of
> RocksDB. 
> 
> I've spotted it via running 
> 
> mvn dependency:tree  
> 
> while investigating with a colleague. Excluding it fixed the issue for us.
> 
> On Tue, Aug 13, 2019 at 3:13 PM Kaymak, Tobias <tobias.kay...@ricardo.ch
> <mailto:tobias.kay...@ricardo.ch>> wrote:
> 
>     Ok I think I have an understanding of what happens - somehow. 
>     Flink switched their RocksDB fork in the 1.8 release, this is why
>     the dependency must now be explicitly added to a project. [0]
>     I did both actually, adding this dependency to my projects pom
>     (resulting in beam_pipelines.jar) and to the lib directory of the
>     Flink docker image to execute the pipeline [1]:
> 
>     FROM flink:1.8.0-scala_2.11
>     ADD --chown=flink:flink
>     
> http://central.maven.org/maven2/org/apache/flink/flink-statebackend-rocksdb_2.11/1.8.0/flink-statebackend-rocksdb_2.11-1.8.0.jar
>     /opt/flink/lib/flink-statebackend-rocksdb_2.11-1.8.0.jar
>     ADD --chown=flink:flink target/di-beam-bundled.jar
>     /opt/flink/lib/beam_pipelines.jar
> 
>     Now everything works up the point when I hit the "Stop" button in
>     the Flink web interface. I think the dependency that the Beam Flink
>     Runner has is wrong as Flink switched to FRocksDB in 1.8 [2]. I
>     guess that's why the runner then hits the: 
>     java.lang.NoSuchMethodError:
>     
> org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;
> 
>     But I might also be wrong, I am still investigating.
> 
>     Best,
>     Tobi 
> 
>     [0] 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#setting-the-per-job-state-backend
>     [1] https://hub.docker.com/_/flink
>     [2] 
> https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.8.html#rocksdb-version-bump-and-switch-to-frocksdb-flink-10471
> 
>     On Tue, Aug 13, 2019 at 2:50 PM Kaymak, Tobias
>     <tobias.kay...@ricardo.ch <mailto:tobias.kay...@ricardo.ch>> wrote:
> 
>         This is a major issue for us as we are no longer able to do a
>         clean-shutdown of the pipelines right now - only cancelling them
>         hard is possible.
> 
>         On Tue, Aug 13, 2019 at 2:46 PM Kaymak, Tobias
>         <tobias.kay...@ricardo.ch <mailto:tobias.kay...@ricardo.ch>> wrote:
> 
>             I just rolled out the upgraded and working 1.8.0/2.14.0
>             combination to production and noticed that when I try to
>             cleanly shutdown a pipeline via the stop button in the
>             web-interface of Flink 1.8.0 I get exactly the same error:
> 
>             java.lang.NoSuchMethodError:
>             
> org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;
>             at
>             
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160)
>             at
>             
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331)
>             at
>             
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362)
>             at
>             
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470)
>             at
>             
> org.apache.flink.streaming.runtime.tasks.StreamTask.tryDisposeAllOperators(StreamTask.java:454)
>             at
>             
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:337)
>             at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>             at java.lang.Thread.run(Thread.java:748)
> 
> 
>             The pipeline then restores from the last snapshot and
>             continues to run, it does not shut-down as expected. 
> 
>             Any idea why this could happen?
> 
>             On Mon, Aug 12, 2019 at 9:49 PM Kaymak, Tobias
>             <tobias.kay...@ricardo.ch <mailto:tobias.kay...@ricardo.ch>>
>             wrote:
> 
>                 * each time :)
> 
>                 On Mon, Aug 12, 2019 at 9:48 PM Kaymak, Tobias
>                 <tobias.kay...@ricardo.ch
>                 <mailto:tobias.kay...@ricardo.ch>> wrote:
> 
>                     I've checked multiple times now and it breaks as
>                     with the 1.8.1 image - I've completely rebuilt the
>                     Docker image and teared down the testing cluster. 
> 
>                     Best,
>                     Tobi
> 
>                     On Mon, Aug 12, 2019 at 1:45 PM Maximilian Michels
>                     <m...@apache.org <mailto:m...@apache.org>> wrote:
> 
>                         Hi Tobias!
> 
>                         I've checked if there were any relevant changes
>                         to the RocksDB state backend in 1.8.1, but I
>                         couldn't spot anything. Could it be that an old
>                         version of RocksDB is still in the Flink cluster
>                         path?
> 
>                         Cheers,
>                         Max
> 
>                         On 06.08.19 16:43, Kaymak, Tobias wrote:
>                         > And of course the moment I click "send" I find
>                         that: 😂
>                         >
>                         > If you use Scala 2.11 and dependency version
>                         1.8.0 in your Beam projects
>                         > pom.xml it *does* work:
>                         >
>                         >         <dependency>
>                         >             <groupId>org.apache.flink</groupId>
>                         >            
>                         
> <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>                         >             <version>1.8.0</version>
>                         >         </dependency>
>                         >
>                         > However, if you want to use 1.8.1 - it *does not*.
>                         >
>                         > I still found it confusing, as I am using the
>                         official Flink Docker
>                         > images which are currently at version 1.8.1.
>                         It would have helped me if
>                         > Beam would bundle the statebackend dependency
>                         (as already mentioned Beam
>                         > allows the user to set a state backend via
>                         parameters of the Flink Runner).
>                         >
>                         > On Tue, Aug 6, 2019 at 4:35 PM Kaymak, Tobias
>                         <tobias.kay...@ricardo.ch
>                         <mailto:tobias.kay...@ricardo.ch>
>                         > <mailto:tobias.kay...@ricardo.ch
>                         <mailto:tobias.kay...@ricardo.ch>>> wrote:
>                         >
>                         >     Hello,
>                         >
>                         >     Flink requires in version 1.8, that if one
>                         wants to use RocksDB as a
>                         >     state backend, that dependency has to be
>                         added to the pom.xml file. [0]
>                         >
>                         >     My cluster stopped working with RocksDB so
>                         I did added this
>                         >     dependency to the pom.xml of my Beam
>                         project (I've tried 1.8.1 and
>                         >     1.8.0):
>                         >
>                         >           <dependency>
>                         >                
>                         <groupId>org.apache.flink</groupId>
>                         >                
>                         
> <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>                         >                 <version>1.8.0</version>
>                         >             </dependency>
>                         >
>                         >     I also tried to instead add
>                         >    
>                         the flink-statebackend-rocksdb_2.11-1.8.0.jar to
>                         the lib directory
>                         >     of the Flink cluster instead (TaskManagers
>                         and JobManager) in all
>                         >     cases I get this error:
>                         >
>                         >     2019-08-06 14:14:15,670 ERROR
>                         >    
>                         org.apache.flink.streaming.runtime.tasks.StreamTask
>                                   -
>                         >     Error during disposal of stream operator
>                         >     java.lang.NoSuchMethodError:
>                         >    
>                         
> org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;
>                         >       at
>                         >    
>                         
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160)
>                         >       at
>                         >    
>                         
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331)
>                         >       at
>                         >    
>                         
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362)
>                         >       at
>                         >    
>                         
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470)
>                         >       at
>                         >    
>                         
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
>                         >       at
>                         >    
>                         
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
>                         >       at
>                         
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>                         >       at java.lang.Thread.run(Thread.java:748)
>                         >
>                         >     This looks like a version mismatch to me,
>                         but I don't know how to
>                         >     solve it - could Beam maybe include the
>                         dependency for the RocksDB
>                         >     backend for Flink 1.8 or higher, as it
>                         allows to set this value via
>                         >     parameters for the Flink Runner? [1]
>                         >
>                         >
>                         >    
>                         [0] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html#setting-the-per-job-state-backend
>                         >    
>                         [1] 
> https://beam.apache.org/documentation/runners/flink/#pipeline-options-for-the-flink-runner
>                         >
> 

Reply via email to