Konstantin, After experimenting with this for a while, I got to the root cause of the problem I am running a version of a Taxi ride travel prediction as my sample. It works fine in Intellij, But when I am trying to put it in the docker (standard Debian 1.7 image) It fails with a following error
The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 9340e7669e7344ab827fef4ddb5ba73d) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) at com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89) at com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) ... 19 more Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288) at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:116) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327) ... 17 more The closest that I found https://stackoverflow.com/questions/37363119/kafka-producer-org-apache-kafka-common-serialization-stringserializer-could-no <https://stackoverflow.com/questions/37363119/kafka-producer-org-apache-kafka-common-serialization-stringserializer-could-no> Which talks about class loader. (I tried there solution, but it did not help) I looked at the loading and I see that the pair of these 2 classes is loaded from my uber jar, but twice. Have you guys seen this error before? Any suggestion? Boris Lublinsky FDP Architect boris.lublin...@lightbend.com https://www.lightbend.com/ > On Feb 19, 2019, at 4:50 AM, Konstantin Knauf <konstan...@ververica.com> > wrote: > > Hi Boris, > > without looking at the entrypoint in much detail, generally there should not > be a race condition there: > > * if the taskmanagers can not connect to the resourcemanager they will retry > (per default the timeout is 5 mins) > * if the JobManager does not get enough resources from the ResourceManager it > will also wait for the resources/slots to provided. The timeout there is also > 5 minutes, I think. > > So, this should actually be pretty robust as long as the Taskmanager > containers can reach the Jobmanager eventually. > > Could you provide the Taskmanager/JobManager logs for such a failure case? > > Cheers, > > Konstantin > > > On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky > <boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>> wrote: > Following > https://github.com/apache/flink/tree/release-1.7/flink-container/docker > <https://github.com/apache/flink/tree/release-1.7/flink-container/docker> > I have created an entry point, which looks like follows: > #!/bin/sh > > ################################################################################ > # from > https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh > > <https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh> > # and > https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh > > <https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh> > ################################################################################ > > # If unspecified, the hostname of the container is taken as the JobManager > address > JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} > > drop_privs_cmd() { > if [ $(id -u) != 0 ]; then > # Don't need to drop privs if EUID != 0 > return > elif [ -x /sbin/su-exec ]; then > # Alpine > echo su-exec flink > else > # Others > echo gosu flink > fi > } > > JOB_MANAGER="jobmanager" > TASK_MANAGER="taskmanager" > > CMD="$1" > shift > > if [ "${CMD}" = "help" ]; then > echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)" > exit 0 > elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then > if [ "${CMD}" = "${TASK_MANAGER}" ]; then > > TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep > -c ^processor /proc/cpuinfo)} > > sed -i -e "s/jobmanager.rpc.address: > localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" > "$FLINK_HOME/conf/flink-conf.yaml" > sed -i -e "s/taskmanager.numberOfTaskSlots: > 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" > "$FLINK_HOME/conf/flink-conf.yaml" > echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml" > echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml" > > echo "Starting Task Manager" > echo "config file: " && grep '^[^\n#]' > "$FLINK_HOME/conf/flink-conf.yaml" > exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" > start-foreground > else > sed -i -e "s/jobmanager.rpc.address: > localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" > "$FLINK_HOME/conf/flink-conf.yaml" > echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml" > echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml" > echo "config file: " && grep '^[^\n#]' > "$FLINK_HOME/conf/flink-conf.yaml" > > if [ -z "$1" ]; then > exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" > start-foreground "$@" > else > exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@" > fi > fi > fi > > exec "$@" > It does work for all the cases, except running standalone job. > The problem, the way I understand it, is a racing condition. > In kubernetes it takes several attempts for establish connection between Job > and Task manager, while standalone-job.sh > tries to start a job immediately once the cluster is created (before > connection is established). > Is there a better option to implement it starting a job on container startup? > > > > -- > Konstantin Knauf | Solutions Architect > +49 160 91394525 > > <https://www.ververica.com/> > Follow us @VervericaData > -- > Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference > Stream Processing | Event Driven | Real Time > -- > Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > -- > Data Artisans GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen