Konstantin,
After experimenting with this for a while, I got to the root cause of the 
problem
I am running a version of a Taxi ride travel prediction as my sample.
It works fine in Intellij,
But when I am trying to put it in the docker (standard Debian 1.7 image)
It fails with a following error


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 
9340e7669e7344ab827fef4ddb5ba73d)
        at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
        at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
        at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
        at 
com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
        at 
com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
        at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
        at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
        at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
        ... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
producer
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288)
        at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:116)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
        at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384)
        at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847)
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of 
org.apache.kafka.common.serialization.Serializer
        at 
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327)
        ... 17 more

The closest that I found 
https://stackoverflow.com/questions/37363119/kafka-producer-org-apache-kafka-common-serialization-stringserializer-could-no
 
<https://stackoverflow.com/questions/37363119/kafka-producer-org-apache-kafka-common-serialization-stringserializer-could-no>
Which talks about class loader. (I tried there solution, but it did not help)
I looked at the loading and I see that the pair of these 2 classes is loaded 
from my uber jar, but twice.

Have you guys seen this error before?
Any suggestion?

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Feb 19, 2019, at 4:50 AM, Konstantin Knauf <konstan...@ververica.com> 
> wrote:
> 
> Hi Boris, 
> 
> without looking at the entrypoint in much detail, generally there should not 
> be a race condition there: 
> 
> * if the taskmanagers can not connect to the resourcemanager they will retry 
> (per default the timeout is 5 mins)
> * if the JobManager does not get enough resources from the ResourceManager it 
> will also wait for the resources/slots to provided. The timeout there is also 
> 5 minutes, I think. 
> 
> So, this should actually be pretty robust as long as the Taskmanager 
> containers can reach the Jobmanager eventually.
> 
> Could you provide the Taskmanager/JobManager logs for such a failure case?
> 
> Cheers, 
> 
> Konstantin
> 
> 
> On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky 
> <boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>> wrote:
> Following 
> https://github.com/apache/flink/tree/release-1.7/flink-container/docker 
> <https://github.com/apache/flink/tree/release-1.7/flink-container/docker>
> I have created an entry point, which looks like follows:
> #!/bin/sh
> 
> ################################################################################
> #   from 
> https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
>  
> <https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh>
> #   and 
> https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh
>  
> <https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh>
> ################################################################################
> 
> # If unspecified, the hostname of the container is taken as the JobManager 
> address
> JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
> 
> drop_privs_cmd() {
>     if [ $(id -u) != 0 ]; then
>         # Don't need to drop privs if EUID != 0
>         return
>     elif [ -x /sbin/su-exec ]; then
>         # Alpine
>         echo su-exec flink
>     else
>         # Others
>         echo gosu flink
>     fi
> }
> 
> JOB_MANAGER="jobmanager"
> TASK_MANAGER="taskmanager"
> 
> CMD="$1"
> shift
> 
> if [ "${CMD}" = "help" ]; then
>     echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)"
>     exit 0
> elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
>     if [ "${CMD}" = "${TASK_MANAGER}" ]; then
>         
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep 
> -c ^processor /proc/cpuinfo)}
> 
>         sed -i -e "s/jobmanager.rpc.address: 
> localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" 
> "$FLINK_HOME/conf/flink-conf.yaml"
>         sed -i -e "s/taskmanager.numberOfTaskSlots: 
> 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" 
> "$FLINK_HOME/conf/flink-conf.yaml"
>         echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
>         echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
> 
>         echo "Starting Task Manager"
>         echo "config file: " && grep '^[^\n#]' 
> "$FLINK_HOME/conf/flink-conf.yaml"
>         exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" 
> start-foreground
>     else
>         sed -i -e "s/jobmanager.rpc.address: 
> localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" 
> "$FLINK_HOME/conf/flink-conf.yaml"
>         echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
>         echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
>         echo "config file: " && grep '^[^\n#]' 
> "$FLINK_HOME/conf/flink-conf.yaml"
> 
>         if [ -z "$1" ]; then
>            exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" 
> start-foreground "$@"
>         else
>             exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
>         fi
>     fi
> fi
> 
> exec "$@"
> It does work for all the cases, except running standalone job.
> The problem, the way I understand it, is a racing condition.
> In kubernetes it takes several attempts for establish connection between Job 
> and Task manager, while standalone-job.sh
>  tries to start a job immediately once the cluster is created (before 
> connection is established).
> Is there a better option to implement it starting a job on container startup?
>  
> 
> 
> -- 
> Konstantin Knauf | Solutions Architect
> +49 160 91394525
> 
>  <https://www.ververica.com/>
> Follow us @VervericaData
> --
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen    

Reply via email to