Good morning,

Thanks for the hint. AKKA streams and http are added to our user jar. If not, 
our HTTP client won't work. According to the flink documentation here: 
https://flink.apache.org/news/2017/12/12/release-1.4.0.html#changes-to-dynamic-class-loading-of-user-code
 child first class loading is enabled by default and we did not change any 
settings here.

Cheers
Martin

----------------------------------------------------------------------------------
Martin Gäckler
Entwicklung Schwarmdienste

Im Auftrag der
e.solutions GmbH
Despagstr. 4a
85055 Ingolstadt
Germany

Registered Office:
Despagstr. 4a
85055 Ingolstadt
Germany

Phone  +49 8458 3332 145

e.solutions GmbH
Managing Directors Uwe Reder, Dr. Riclef Schmidt-Clausen
Register Court Ingolstadt HRB 5221

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: Montag, 18. Juni 2018 09:25
To: Gäckler Martin <martin.gaeck...@esolutions.de>
Cc: user <user@flink.apache.org>
Subject: Re: flink and akka HTTP

Hi,

I assume that you have an Akka dependency conflict. By adding the Akka 
dependency version to your user jar and enabling child first class loading you 
should be able to control which Akka version is loaded. The only thing you have 
to check is whether Flink works with a newer version of Akka.

Cheers,
Till

On Fri, Jun 15, 2018 at 8:15 PM Gäckler Martin 
<martin.gaeck...@esolutions.de<mailto:martin.gaeck...@esolutions.de>> wrote:
Good evening,

According to Flink's documentation I have excluded the Flink runtime library 
from the runtime dependencies of my project:

dependencies {
    compileOnly     group: 'org.apache.flink',  name: 'flink-core',             
         version: '1.4.2'
    compileOnly     group: 'org.apache.flink',  name: 'flink-java',             
         version: '1.4.2'
    compileOnly     group: 'org.apache.flink',  name: 
'flink-streaming-java_2.11',       version: '1.4.2'
    implementation  group: 'org.apache.flink',  name: 
'flink-connector-kafka-0.11_2.11', version: '1.4.2'
    ...
}

Unfortunately I get the following error:

Caused by: java.lang.ClassCastException: interface 
akka.serialization.Serializer is not assignable from class 
akka.remote.serialization.MiscMessageSerializer
     at 
akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply(ReflectiveDynamicAccess.scala:23)
     at 
akka.actor.ReflectiveDynamicAccess$$anonfun$getClassFor$1.apply(ReflectiveDynamicAccess.scala:20)
     at scala.util.Try$.apply(Try.scala:192)
     at 
akka.actor.ReflectiveDynamicAccess.getClassFor(ReflectiveDynamicAccess.scala:20)
     at 
akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:38)
     at akka.serialization.Serialization.serializerOf(Serialization.scala:301)
     at 
akka.serialization.Serialization$$anonfun$6.apply(Serialization.scala:327)
     at 
akka.serialization.Serialization$$anonfun$6.apply(Serialization.scala:327)
     at 
scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
     at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
     at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
     at 
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682)
     at akka.serialization.Serialization.<init>(Serialization.scala:327)
     at 
akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:15)
     at 
akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:12)
     at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:913)
     at 
akka.actor.ActorSystemImpl$$anonfun$loadExtensions$1$1.apply(ActorSystem.scala:946)
     at 
akka.actor.ActorSystemImpl$$anonfun$loadExtensions$1$1.apply(ActorSystem.scala:944)
     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
     at akka.actor.ActorSystemImpl.loadExtensions$1(ActorSystem.scala:944)
     at akka.actor.ActorSystemImpl.loadExtensions(ActorSystem.scala:961)
     at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:833)
     at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:823)
     at akka.actor.ActorSystemImpl._start(ActorSystem.scala:823)
     at akka.actor.ActorSystemImpl.start(ActorSystem.scala:842)
     at akka.actor.ActorSystem$.apply(ActorSystem.scala:246)
     at akka.actor.ActorSystem$.apply(ActorSystem.scala:289)
     at akka.actor.ActorSystem$.apply(ActorSystem.scala:234)
     at akka.actor.ActorSystem$.apply(ActorSystem.scala:225)
     at akka.actor.ActorSystem$.create(ActorSystem.scala:160)
     at akka.actor.ActorSystem.create(ActorSystem.scala)
     at 
de.eso.swarm.rest.client.akka.AkkaRestClient.<clinit>(AkkaRestClient.java:43)
     ... 12 more

My application needs to initialize the AKKA ActorSystem because it uses an HTTP 
client that I have developed using akka-http and akka-stream. Here are the 
dependencies of my HTTP client:

dependencies {
    compile     project(':platform-sdk-java-core')
    testCompile project(':platform-sdk-java-testing')

    implementation group: 'com.typesafe.akka',     name: 'akka-http_2.11',      
version: '10.1.2'
    implementation group: 'com.typesafe.akka',     name: 'akka-stream_2.11',    
version: '2.5.11'

    implementation group: 'com.google.code.gson',  name: 'gson',                
version: '2.8.4'
    implementation group: 'com.google.protobuf',   name: 'protobuf-java',       
version: '3.5.1'

    testImplementation group: 'junit', name: 'junit', version: '4.12'
}

On the other hand, when I add the flink runtime to my runtime dependencies, 
everything works:

dependencies {
    implementation  group: 'org.apache.flink',  name: 'flink-core',             
            version: '1.4.2'
    implementation  group: 'org.apache.flink',  name: 'flink-java',             
            version: '1.4.2'
    implementation  group: 'org.apache.flink',  name: 
'flink-streaming-java_2.11',          version: '1.4.2'
    implementation  group: 'org.apache.flink',  name: 
'flink-connector-kafka-0.11_2.11',    version: '1.4.2'
}

Relocation of AKKA did not solve the problem, because AKKA did no longer find 
its serialization classes.
Using an older version of akka http and akka streams is also not an option, 
because the client won't compile in that case.

Are the any other ideas?

Thanks in advance

Martin

----------------------------------------------------------------------------------
Martin Gäckler
Entwicklung Schwarmdienste

Im Auftrag der
e.solutions GmbH
Despagstr. 4a
85055 Ingolstadt
Germany

Registered Office:
Despagstr. 4a
85055 Ingolstadt
Germany

Phone  +49 8458 3332 145

e.solutions GmbH
Managing Directors Uwe Reder, Dr. Riclef Schmidt-Clausen
Register Court Ingolstadt HRB 5221

Reply via email to