Re: Linkage Error RocksDB and flink-1.6.4
Hi Shu Su, the first point exactly pinpointed the issue I bumped into. I forgot to put that dependency to "provided". Thank you! Il giorno lun 24 giu 2019 alle ore 05:35 Shu Su ha scritto: > Hi Andrea > > Actually It’s caused by Flink’s ClassLoader. It’s because flink use > parent Classloader to load jar first and then you use it in your user’s > code, then user-code classloader will load it again so it raised the error. > There are two solutions. > 1. Add scope “provided” to maven pom.xml > > org.apache.flink > flink-statebackend-rocksdb_2.11 > ${flink_version} > *provided* > > 2. Set this classloader.resolve-order: parent-first in flink-conf.yml > > Hope this will help you. > > Thanks, > Simon > On 06/24/2019 11:27,Yun Tang wrote: > > Hi Andrea > > Since I have not written Scala for a while, I wonder why you need to > instantiate your ColumnFamilyOptions, BlockBasedTableConfig and DBOptions > on JM side. As far as I can see, you could instantiate your on your TM side > like code: > > val rocksdbConfig = new OptionsFactory() { > override def createDBOptions(currentOptions: DBOptions): DBOptions = > currentOptions.setIncreaseParallelism(properties.threadNo) > > override def createColumnOptions(currentOptions: ColumnFamilyOptions): > ColumnFamilyOptions = > > currentOptions.setWriteBufferSize(MemorySize.parseBytes(properties.writeBufferSize)) > } > > You just need to serialize the properties via closure to TMs. Hope this could > help you. > > Best > Yun Tang > ------ > *From:* Andrea Spina > *Sent:* Monday, June 24, 2019 2:20 > *To:* user > *Subject:* Linkage Error RocksDB and flink-1.6.4 > > Dear community, > I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11. > At the job Startp the following exception happens (it's recorded by the Job > Manager). > > > > > > > > > > > > *Caused by: java.lang.LinkageError: loader constraint violation: loader > (instance of > org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) > previously initiated loading for a different type with name > "org/rocksdb/DBOptions" at > java.lang.ClassLoader.defineClass1(Native Method) at > java.lang.ClassLoader.defineClass(ClassLoader.java:763) at > java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) > at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) > at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at > java.net.URLClassLoader$1.run(URLClassLoader.java:369) at > java.net.URLClassLoader$1.run(URLClassLoader.java:363) at > java.security.AccessController.doPrivileged(Native Method) at > java.net.URLClassLoader.findClass(URLClassLoader.java:362) at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)* > > For this job, I programmatically set some RocksDB options by using the > code appended below. Anybody can help with this? Thank you so much, > Andrea > > > import org.apache.flink.configuration.MemorySize > import org.apache.flink.contrib.streaming.state.{OptionsFactory, > PredefinedOptions, RocksDBStateBackend} > import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions} > > object ConfigurableRocksDB { > > lazy val columnOptions = new ColumnFamilyOptions() with Serializable > lazy val tableConfig = new BlockBasedTableConfig() with Serializable > lazy val dbOptions = new DBOptions() with Serializable > > def configureStateBackendRocksDB(properties: FlinkDeployment): > RocksDBStateBackend = { > properties.threadNo.foreach(dbOptions.setIncreaseParallelism) > > properties.blockSize.foreach(bs => > tableConfig.setBlockSize(MemorySize.parseBytes(bs))) > properties.cacheSize.foreach(cs => > tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs))) > properties.cacheIndicesAndFilters.foreach(cif => if (cif) > tableConfig.cacheIndexAndFilterBlocks()) > properties.writeBufferSize.foreach(wbs => > columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs))) > > columnOptions.setTableFormatConfig(tableConfig) > properties.writeBufferToMerge.foreach(bm => > columnOptions.setMinWriteBufferNumberToMerge(bm)) > properties.writeBufferCount.foreach(bc => > columnOptions.setMaxWriteBufferNumber(bc)) > properties.optimizeFilterForHits.foreach(op => if (op)
Re: Linkage Error RocksDB and flink-1.6.4
Hi Andrea Actually It’s caused by Flink’s ClassLoader. It’s because flink use parent Classloader to load jar first and then you use it in your user’s code, then user-code classloader will load it again so it raised the error. There are two solutions. 1. Add scope “provided” to maven pom.xml org.apache.flink flink-statebackend-rocksdb_2.11 ${flink_version} provided 2. Set this classloader.resolve-order: parent-first in flink-conf.yml Hope this will help you. Thanks, Simon On 06/24/2019 11:27,Yun Tang wrote: Hi Andrea Since I have not written Scala for a while, I wonder why you need to instantiate your ColumnFamilyOptions, BlockBasedTableConfig and DBOptions on JM side. As far as I can see, you could instantiate your on your TM side like code: val rocksdbConfig = new OptionsFactory() { override def createDBOptions(currentOptions: DBOptions): DBOptions = currentOptions.setIncreaseParallelism(properties.threadNo) override def createColumnOptions(currentOptions: ColumnFamilyOptions): ColumnFamilyOptions = currentOptions.setWriteBufferSize(MemorySize.parseBytes(properties.writeBufferSize)) } You just need to serialize theproperties via closure to TMs. Hope this could help you. Best Yun Tang From: Andrea Spina Sent: Monday, June 24, 2019 2:20 To: user Subject: Linkage Error RocksDB and flink-1.6.4 Dear community, I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11. At the job Startp the following exception happens (it's recorded by the Job Manager). Caused by: java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) previously initiated loading for a different type with name "org/rocksdb/DBOptions" at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126) For this job, I programmatically set some RocksDB options by using the code appended below. Anybody can help with this? Thank you so much, Andrea import org.apache.flink.configuration.MemorySize import org.apache.flink.contrib.streaming.state.{OptionsFactory, PredefinedOptions, RocksDBStateBackend} import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions} object ConfigurableRocksDB { lazy val columnOptions = new ColumnFamilyOptions() with Serializable lazy val tableConfig = new BlockBasedTableConfig() with Serializable lazy val dbOptions = new DBOptions() with Serializable def configureStateBackendRocksDB(properties: FlinkDeployment): RocksDBStateBackend = { properties.threadNo.foreach(dbOptions.setIncreaseParallelism) properties.blockSize.foreach(bs => tableConfig.setBlockSize(MemorySize.parseBytes(bs))) properties.cacheSize.foreach(cs => tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs))) properties.cacheIndicesAndFilters.foreach(cif => if (cif) tableConfig.cacheIndexAndFilterBlocks()) properties.writeBufferSize.foreach(wbs => columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs))) columnOptions.setTableFormatConfig(tableConfig) properties.writeBufferToMerge.foreach(bm => columnOptions.setMinWriteBufferNumberToMerge(bm)) properties.writeBufferCount.foreach(bc => columnOptions.setMaxWriteBufferNumber(bc)) properties.optimizeFilterForHits.foreach(op => if (op) columnOptions.optimizeFiltersForHits()) val rocksdbConfig = new OptionsFactory() { override def createDBOptions(currentOptions: DBOptions): DBOptions = dbOptions override def createColumnOptions(currentOptions: ColumnFamilyOptions): ColumnFamilyOptions = columnOptions } val stateBE = new RocksDBStateBackend(properties.checkpointDir.get, properties.checkpointIncremental.getOrElse(false)) stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED) stateBE.setOptions(rocksdbConfig) stateBE } } -- Andrea Spina Head of R @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Re: Linkage Error RocksDB and flink-1.6.4
Hi Andrea Since I have not written Scala for a while, I wonder why you need to instantiate your ColumnFamilyOptions, BlockBasedTableConfig and DBOptions on JM side. As far as I can see, you could instantiate your on your TM side like code: val rocksdbConfig = new OptionsFactory() { override def createDBOptions(currentOptions: DBOptions): DBOptions = currentOptions.setIncreaseParallelism(properties.threadNo) override def createColumnOptions(currentOptions: ColumnFamilyOptions): ColumnFamilyOptions = currentOptions.setWriteBufferSize(MemorySize.parseBytes(properties.writeBufferSize)) } You just need to serialize the properties via closure to TMs. Hope this could help you. Best Yun Tang From: Andrea Spina Sent: Monday, June 24, 2019 2:20 To: user Subject: Linkage Error RocksDB and flink-1.6.4 Dear community, I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11. At the job Startp the following exception happens (it's recorded by the Job Manager). Caused by: java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) previously initiated loading for a different type with name "org/rocksdb/DBOptions" at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126) For this job, I programmatically set some RocksDB options by using the code appended below. Anybody can help with this? Thank you so much, Andrea import org.apache.flink.configuration.MemorySize import org.apache.flink.contrib.streaming.state.{OptionsFactory, PredefinedOptions, RocksDBStateBackend} import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions} object ConfigurableRocksDB { lazy val columnOptions = new ColumnFamilyOptions() with Serializable lazy val tableConfig = new BlockBasedTableConfig() with Serializable lazy val dbOptions = new DBOptions() with Serializable def configureStateBackendRocksDB(properties: FlinkDeployment): RocksDBStateBackend = { properties.threadNo.foreach(dbOptions.setIncreaseParallelism) properties.blockSize.foreach(bs => tableConfig.setBlockSize(MemorySize.parseBytes(bs))) properties.cacheSize.foreach(cs => tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs))) properties.cacheIndicesAndFilters.foreach(cif => if (cif) tableConfig.cacheIndexAndFilterBlocks()) properties.writeBufferSize.foreach(wbs => columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs))) columnOptions.setTableFormatConfig(tableConfig) properties.writeBufferToMerge.foreach(bm => columnOptions.setMinWriteBufferNumberToMerge(bm)) properties.writeBufferCount.foreach(bc => columnOptions.setMaxWriteBufferNumber(bc)) properties.optimizeFilterForHits.foreach(op => if (op) columnOptions.optimizeFiltersForHits()) val rocksdbConfig = new OptionsFactory() { override def createDBOptions(currentOptions: DBOptions): DBOptions = dbOptions override def createColumnOptions(currentOptions: ColumnFamilyOptions): ColumnFamilyOptions = columnOptions } val stateBE = new RocksDBStateBackend(properties.checkpointDir.get, properties.checkpointIncremental.getOrElse(false)) stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED) stateBE.setOptions(rocksdbConfig) stateBE } } -- Andrea Spina Head of R @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Linkage Error RocksDB and flink-1.6.4
Dear community, I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11. At the job Startp the following exception happens (it's recorded by the Job Manager). *Caused by: java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) previously initiated loading for a different type with name "org/rocksdb/DBOptions"at java.lang.ClassLoader.defineClass1(Native Method)at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at java.net.URLClassLoader.access$100(URLClassLoader.java:74)at java.net.URLClassLoader$1.run(URLClassLoader.java:369)at java.net.URLClassLoader$1.run(URLClassLoader.java:363)at java.security.AccessController.doPrivileged(Native Method)at java.net.URLClassLoader.findClass(URLClassLoader.java:362)at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)* For this job, I programmatically set some RocksDB options by using the code appended below. Anybody can help with this? Thank you so much, Andrea import org.apache.flink.configuration.MemorySize import org.apache.flink.contrib.streaming.state.{OptionsFactory, PredefinedOptions, RocksDBStateBackend} import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions} object ConfigurableRocksDB { lazy val columnOptions = new ColumnFamilyOptions() with Serializable lazy val tableConfig = new BlockBasedTableConfig() with Serializable lazy val dbOptions = new DBOptions() with Serializable def configureStateBackendRocksDB(properties: FlinkDeployment): RocksDBStateBackend = { properties.threadNo.foreach(dbOptions.setIncreaseParallelism) properties.blockSize.foreach(bs => tableConfig.setBlockSize(MemorySize.parseBytes(bs))) properties.cacheSize.foreach(cs => tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs))) properties.cacheIndicesAndFilters.foreach(cif => if (cif) tableConfig.cacheIndexAndFilterBlocks()) properties.writeBufferSize.foreach(wbs => columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs))) columnOptions.setTableFormatConfig(tableConfig) properties.writeBufferToMerge.foreach(bm => columnOptions.setMinWriteBufferNumberToMerge(bm)) properties.writeBufferCount.foreach(bc => columnOptions.setMaxWriteBufferNumber(bc)) properties.optimizeFilterForHits.foreach(op => if (op) columnOptions.optimizeFiltersForHits()) val rocksdbConfig = new OptionsFactory() { override def createDBOptions(currentOptions: DBOptions): DBOptions = dbOptions override def createColumnOptions(currentOptions: ColumnFamilyOptions): ColumnFamilyOptions = columnOptions } val stateBE = new RocksDBStateBackend(properties.checkpointDir.get, properties.checkpointIncremental.getOrElse(false)) stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED) stateBE.setOptions(rocksdbConfig) stateBE } } -- *Andrea Spina* Head of R @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT