Re: Linkage Error RocksDB and flink-1.6.4

2019-06-24 Thread Andrea Spina
Hi Shu Su,
the first point exactly pinpointed the issue I bumped into. I forgot to put
that dependency to "provided". Thank you!

Il giorno lun 24 giu 2019 alle ore 05:35 Shu Su  ha
scritto:

> Hi Andrea
>
> Actually It’s caused by Flink’s ClassLoader. It’s because flink use
> parent Classloader to load jar first and then you use it in your user’s
> code, then user-code classloader will load it again so it raised the error.
> There are two solutions.
> 1.  Add scope “provided” to maven pom.xml
> 
> org.apache.flink
> flink-statebackend-rocksdb_2.11
> ${flink_version}
> *provided*
> 
> 2. Set this classloader.resolve-order: parent-first in flink-conf.yml
>
> Hope this will help you.
>
> Thanks,
> Simon
> On 06/24/2019 11:27,Yun Tang  wrote:
>
> Hi Andrea
>
> Since I have not written Scala for a while, I wonder why you need to
> instantiate your ColumnFamilyOptions, BlockBasedTableConfig and DBOptions
> on JM side. As far as I can see, you could instantiate your on your TM side
> like code:
>
> val rocksdbConfig = new OptionsFactory() {
>   override def createDBOptions(currentOptions: DBOptions): DBOptions =
>  currentOptions.setIncreaseParallelism(properties.threadNo)
>
>   override def createColumnOptions(currentOptions: ColumnFamilyOptions): 
> ColumnFamilyOptions =
>  
> currentOptions.setWriteBufferSize(MemorySize.parseBytes(properties.writeBufferSize))
> }
>
> You just need to serialize the properties via closure to TMs. Hope this could 
> help you.
>
> Best
> Yun Tang
> ------
> *From:* Andrea Spina 
> *Sent:* Monday, June 24, 2019 2:20
> *To:* user
> *Subject:* Linkage Error RocksDB and flink-1.6.4
>
> Dear community,
> I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11.
> At the job Startp the following exception happens (it's recorded by the Job
> Manager).
>
>
>
>
>
>
>
>
>
>
>
> *Caused by: java.lang.LinkageError: loader constraint violation: loader
> (instance of
> org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
> previously initiated loading for a different type with name
> "org/rocksdb/DBOptions" at
> java.lang.ClassLoader.defineClass1(Native Method) at
> java.lang.ClassLoader.defineClass(ClassLoader.java:763) at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at
> java.net.URLClassLoader$1.run(URLClassLoader.java:369) at
> java.net.URLClassLoader$1.run(URLClassLoader.java:363) at
> java.security.AccessController.doPrivileged(Native Method) at
> java.net.URLClassLoader.findClass(URLClassLoader.java:362) at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)*
>
> For this job, I programmatically set some RocksDB options by using the
> code appended below. Anybody can help with this? Thank you so much,
> Andrea
>
>
> import org.apache.flink.configuration.MemorySize
> import org.apache.flink.contrib.streaming.state.{OptionsFactory, 
> PredefinedOptions, RocksDBStateBackend}
> import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions}
>
> object ConfigurableRocksDB {
>
>   lazy val columnOptions = new ColumnFamilyOptions() with Serializable
>   lazy val tableConfig   = new BlockBasedTableConfig() with Serializable
>   lazy val dbOptions = new DBOptions() with Serializable
>
>   def configureStateBackendRocksDB(properties: FlinkDeployment): 
> RocksDBStateBackend = {
> properties.threadNo.foreach(dbOptions.setIncreaseParallelism)
>
> properties.blockSize.foreach(bs => 
> tableConfig.setBlockSize(MemorySize.parseBytes(bs)))
> properties.cacheSize.foreach(cs => 
> tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs)))
> properties.cacheIndicesAndFilters.foreach(cif => if (cif) 
> tableConfig.cacheIndexAndFilterBlocks())
> properties.writeBufferSize.foreach(wbs => 
> columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs)))
>
> columnOptions.setTableFormatConfig(tableConfig)
> properties.writeBufferToMerge.foreach(bm => 
> columnOptions.setMinWriteBufferNumberToMerge(bm))
> properties.writeBufferCount.foreach(bc => 
> columnOptions.setMaxWriteBufferNumber(bc))
> properties.optimizeFilterForHits.foreach(op => if (op) 

Re: Linkage Error RocksDB and flink-1.6.4

2019-06-23 Thread Shu Su
Hi Andrea


Actually It’s caused by Flink’s ClassLoader. It’s because flink use parent 
Classloader to load jar first and then you use it in your user’s code, then 
user-code classloader will load it again so it raised the error. There are two 
solutions.
1.  Add scope “provided” to maven pom.xml

org.apache.flink
flink-statebackend-rocksdb_2.11
${flink_version}
provided

2. Set this classloader.resolve-order: parent-first in flink-conf.yml


Hope this will help you.


Thanks,
Simon
On 06/24/2019 11:27,Yun Tang wrote:
Hi Andrea


Since I have not written Scala for a while, I wonder why you need to 
instantiate your ColumnFamilyOptions, BlockBasedTableConfig and DBOptions on JM 
side. As far as I can see, you could instantiate your on your TM side like code:


val rocksdbConfig = new OptionsFactory() {
override def createDBOptions(currentOptions: DBOptions): DBOptions = 
 currentOptions.setIncreaseParallelism(properties.threadNo)

  override def createColumnOptions(currentOptions: ColumnFamilyOptions): 
ColumnFamilyOptions = 
 
currentOptions.setWriteBufferSize(MemorySize.parseBytes(properties.writeBufferSize))
}

You just need to serialize theproperties via closure to TMs. Hope this could 
help you.

Best
Yun Tang

From: Andrea Spina 
Sent: Monday, June 24, 2019 2:20
To: user
Subject: Linkage Error RocksDB and flink-1.6.4
 
Dear community,
I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11. At 
the job Startp the following exception happens (it's recorded by the Job 
Manager).

Caused by: java.lang.LinkageError: loader constraint violation: loader 
(instance of 
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
 previously initiated loading for a different type with name 
"org/rocksdb/DBOptions"
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)



For this job, I programmatically set some RocksDB options by using the code 
appended below. Anybody can help with this? Thank you so much,
Andrea


import org.apache.flink.configuration.MemorySize
import org.apache.flink.contrib.streaming.state.{OptionsFactory, 
PredefinedOptions, RocksDBStateBackend}
import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions}
object ConfigurableRocksDB {

lazy val columnOptions = new ColumnFamilyOptions() with Serializable
lazy val tableConfig   = new BlockBasedTableConfig() with Serializable
lazy val dbOptions = new DBOptions() with Serializable

def configureStateBackendRocksDB(properties: FlinkDeployment): 
RocksDBStateBackend = {
properties.threadNo.foreach(dbOptions.setIncreaseParallelism)

properties.blockSize.foreach(bs => 
tableConfig.setBlockSize(MemorySize.parseBytes(bs)))
properties.cacheSize.foreach(cs => 
tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs)))
properties.cacheIndicesAndFilters.foreach(cif => if (cif) 
tableConfig.cacheIndexAndFilterBlocks())
properties.writeBufferSize.foreach(wbs => 
columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs)))

columnOptions.setTableFormatConfig(tableConfig)
properties.writeBufferToMerge.foreach(bm => 
columnOptions.setMinWriteBufferNumberToMerge(bm))
properties.writeBufferCount.foreach(bc => 
columnOptions.setMaxWriteBufferNumber(bc))
properties.optimizeFilterForHits.foreach(op => if (op) 
columnOptions.optimizeFiltersForHits())

val rocksdbConfig = new OptionsFactory() {
override def createDBOptions(currentOptions: DBOptions): DBOptions  
   = dbOptions
override def createColumnOptions(currentOptions: ColumnFamilyOptions): 
ColumnFamilyOptions = columnOptions
}

val stateBE =
new RocksDBStateBackend(properties.checkpointDir.get, 
properties.checkpointIncremental.getOrElse(false))
stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
stateBE.setOptions(rocksdbConfig)

stateBE
  }

}
--

Andrea Spina
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT

Re: Linkage Error RocksDB and flink-1.6.4

2019-06-23 Thread Yun Tang
Hi Andrea

Since I have not written Scala for a while, I wonder why you need to 
instantiate your ColumnFamilyOptions, BlockBasedTableConfig and DBOptions on JM 
side. As far as I can see, you could instantiate your on your TM side like code:


val rocksdbConfig = new OptionsFactory() {
  override def createDBOptions(currentOptions: DBOptions): DBOptions =
 currentOptions.setIncreaseParallelism(properties.threadNo)

  override def createColumnOptions(currentOptions: ColumnFamilyOptions): 
ColumnFamilyOptions =
 
currentOptions.setWriteBufferSize(MemorySize.parseBytes(properties.writeBufferSize))
}

You just need to serialize the properties via closure to TMs. Hope this could 
help you.

Best
Yun Tang

From: Andrea Spina 
Sent: Monday, June 24, 2019 2:20
To: user
Subject: Linkage Error RocksDB and flink-1.6.4

Dear community,
I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11. At 
the job Startp the following exception happens (it's recorded by the Job 
Manager).

Caused by: java.lang.LinkageError: loader constraint violation: loader 
(instance of 
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
 previously initiated loading for a different type with name 
"org/rocksdb/DBOptions"
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)

For this job, I programmatically set some RocksDB options by using the code 
appended below. Anybody can help with this? Thank you so much,
Andrea

import org.apache.flink.configuration.MemorySize
import org.apache.flink.contrib.streaming.state.{OptionsFactory, 
PredefinedOptions, RocksDBStateBackend}
import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions}

object ConfigurableRocksDB {

  lazy val columnOptions = new ColumnFamilyOptions() with Serializable
  lazy val tableConfig   = new BlockBasedTableConfig() with Serializable
  lazy val dbOptions = new DBOptions() with Serializable

  def configureStateBackendRocksDB(properties: FlinkDeployment): 
RocksDBStateBackend = {
properties.threadNo.foreach(dbOptions.setIncreaseParallelism)

properties.blockSize.foreach(bs => 
tableConfig.setBlockSize(MemorySize.parseBytes(bs)))
properties.cacheSize.foreach(cs => 
tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs)))
properties.cacheIndicesAndFilters.foreach(cif => if (cif) 
tableConfig.cacheIndexAndFilterBlocks())
properties.writeBufferSize.foreach(wbs => 
columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs)))

columnOptions.setTableFormatConfig(tableConfig)
properties.writeBufferToMerge.foreach(bm => 
columnOptions.setMinWriteBufferNumberToMerge(bm))
properties.writeBufferCount.foreach(bc => 
columnOptions.setMaxWriteBufferNumber(bc))
properties.optimizeFilterForHits.foreach(op => if (op) 
columnOptions.optimizeFiltersForHits())

val rocksdbConfig = new OptionsFactory() {
  override def createDBOptions(currentOptions: DBOptions): DBOptions
 = dbOptions
  override def createColumnOptions(currentOptions: ColumnFamilyOptions): 
ColumnFamilyOptions = columnOptions
}

val stateBE =
  new RocksDBStateBackend(properties.checkpointDir.get, 
properties.checkpointIncremental.getOrElse(false))
stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
stateBE.setOptions(rocksdbConfig)

stateBE
  }

}

--
Andrea Spina
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


Linkage Error RocksDB and flink-1.6.4

2019-06-23 Thread Andrea Spina
Dear community,
I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11.
At the job Startp the following exception happens (it's recorded by the Job
Manager).











*Caused by: java.lang.LinkageError: loader constraint violation: loader
(instance of
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
previously initiated loading for a different type with name
"org/rocksdb/DBOptions"at java.lang.ClassLoader.defineClass1(Native
Method)at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
  at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)at
java.net.URLClassLoader$1.run(URLClassLoader.java:369)at
java.net.URLClassLoader$1.run(URLClassLoader.java:363)at
java.security.AccessController.doPrivileged(Native Method)at
java.net.URLClassLoader.findClass(URLClassLoader.java:362)at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)*

For this job, I programmatically set some RocksDB options by using the code
appended below. Anybody can help with this? Thank you so much,
Andrea


import org.apache.flink.configuration.MemorySize
import org.apache.flink.contrib.streaming.state.{OptionsFactory,
PredefinedOptions, RocksDBStateBackend}
import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions}

object ConfigurableRocksDB {

  lazy val columnOptions = new ColumnFamilyOptions() with Serializable
  lazy val tableConfig   = new BlockBasedTableConfig() with Serializable
  lazy val dbOptions = new DBOptions() with Serializable

  def configureStateBackendRocksDB(properties: FlinkDeployment):
RocksDBStateBackend = {
properties.threadNo.foreach(dbOptions.setIncreaseParallelism)

properties.blockSize.foreach(bs =>
tableConfig.setBlockSize(MemorySize.parseBytes(bs)))
properties.cacheSize.foreach(cs =>
tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs)))
properties.cacheIndicesAndFilters.foreach(cif => if (cif)
tableConfig.cacheIndexAndFilterBlocks())
properties.writeBufferSize.foreach(wbs =>
columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs)))

columnOptions.setTableFormatConfig(tableConfig)
properties.writeBufferToMerge.foreach(bm =>
columnOptions.setMinWriteBufferNumberToMerge(bm))
properties.writeBufferCount.foreach(bc =>
columnOptions.setMaxWriteBufferNumber(bc))
properties.optimizeFilterForHits.foreach(op => if (op)
columnOptions.optimizeFiltersForHits())

val rocksdbConfig = new OptionsFactory() {
  override def createDBOptions(currentOptions: DBOptions):
DBOptions = dbOptions
  override def createColumnOptions(currentOptions:
ColumnFamilyOptions): ColumnFamilyOptions = columnOptions
}

val stateBE =
  new RocksDBStateBackend(properties.checkpointDir.get,
properties.checkpointIncremental.getOrElse(false))
stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
stateBE.setOptions(rocksdbConfig)

stateBE
  }

}

-- 
*Andrea Spina*
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT