Dear community,
I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11.
At the job Startp the following exception happens (it's recorded by the Job
Manager).











*Caused by: java.lang.LinkageError: loader constraint violation: loader
(instance of
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
previously initiated loading for a different type with name
"org/rocksdb/DBOptions"        at java.lang.ClassLoader.defineClass1(Native
Method)        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
      at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)        at
java.net.URLClassLoader$1.run(URLClassLoader.java:369)        at
java.net.URLClassLoader$1.run(URLClassLoader.java:363)        at
java.security.AccessController.doPrivileged(Native Method)        at
java.net.URLClassLoader.findClass(URLClassLoader.java:362)        at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)*

For this job, I programmatically set some RocksDB options by using the code
appended below. Anybody can help with this? Thank you so much,
Andrea


import org.apache.flink.configuration.MemorySize
import org.apache.flink.contrib.streaming.state.{OptionsFactory,
PredefinedOptions, RocksDBStateBackend}
import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions}

object ConfigurableRocksDB {

  lazy val columnOptions = new ColumnFamilyOptions() with Serializable
  lazy val tableConfig   = new BlockBasedTableConfig() with Serializable
  lazy val dbOptions     = new DBOptions() with Serializable

  def configureStateBackendRocksDB(properties: FlinkDeployment):
RocksDBStateBackend = {
    properties.threadNo.foreach(dbOptions.setIncreaseParallelism)

    properties.blockSize.foreach(bs =>
tableConfig.setBlockSize(MemorySize.parseBytes(bs)))
    properties.cacheSize.foreach(cs =>
tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs)))
    properties.cacheIndicesAndFilters.foreach(cif => if (cif)
tableConfig.cacheIndexAndFilterBlocks())
    properties.writeBufferSize.foreach(wbs =>
columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs)))

    columnOptions.setTableFormatConfig(tableConfig)
    properties.writeBufferToMerge.foreach(bm =>
columnOptions.setMinWriteBufferNumberToMerge(bm))
    properties.writeBufferCount.foreach(bc =>
columnOptions.setMaxWriteBufferNumber(bc))
    properties.optimizeFilterForHits.foreach(op => if (op)
columnOptions.optimizeFiltersForHits())

    val rocksdbConfig = new OptionsFactory() {
      override def createDBOptions(currentOptions: DBOptions):
DBOptions                         = dbOptions
      override def createColumnOptions(currentOptions:
ColumnFamilyOptions): ColumnFamilyOptions = columnOptions
    }

    val stateBE =
      new RocksDBStateBackend(properties.checkpointDir.get,
properties.checkpointIncremental.getOrElse(false))
    stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
    stateBE.setOptions(rocksdbConfig)

    stateBE
  }

}

-- 
*Andrea Spina*
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT

Reply via email to