Thanks Ken, That was my first instinct as well, but.. To run on the cluster I am building an uber jar for which I am fixing Kafka clients jar version I am also fixing version of Kafka So I do not know where another version can get from
Boris Lublinsky FDP Architect boris.lublin...@lightbend.com https://www.lightbend.com/ > On Feb 19, 2019, at 7:02 PM, Ken Krugler <kkrugler_li...@transpac.com> wrote: > > Hi Boris, > > I haven’t seen this exact error, but I have seen similar errors caused by > multiple versions of jars on the classpath. > > When I’ve run into this particular "XXX is not an instance of YYY" problem, > it often seems to be caused by a jar that I should have marked as provided in > my pom. > > Though I’m typically running on a YARN cluster, not w/K8s, so maybe this > doesn’t apply. > > — Ken > > PS - I assume you’ve been reading > https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html > > <https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html> > > >> On Feb 19, 2019, at 4:34 PM, Boris Lublinsky <boris.lublin...@lightbend.com >> <mailto:boris.lublin...@lightbend.com>> wrote: >> >> Konstantin, >> After experimenting with this for a while, I got to the root cause of the >> problem >> I am running a version of a Taxi ride travel prediction as my sample. >> It works fine in Intellij, >> But when I am trying to put it in the docker (standard Debian 1.7 image) >> It fails with a following error >> >> >> The program finished with the following exception: >> >> org.apache.flink.client.program.ProgramInvocationException: Job failed. >> (JobID: 9340e7669e7344ab827fef4ddb5ba73d) >> at >> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487) >> at >> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >> at >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >> at >> com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89) >> at >> com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) >> at >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) >> at >> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) >> at >> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >> execution failed. >> at >> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >> at >> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) >> ... 19 more >> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka >> producer >> at >> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416) >> at >> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288) >> at >> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:116) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) >> at >> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384) >> at >> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.kafka.common.KafkaException: >> org.apache.kafka.common.serialization.ByteArraySerializer is not an instance >> of org.apache.kafka.common.serialization.Serializer >> at >> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248) >> at >> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327) >> ... 17 more >> >> The closest that I found >> https://stackoverflow.com/questions/37363119/kafka-producer-org-apache-kafka-common-serialization-stringserializer-could-no >> >> <https://stackoverflow.com/questions/37363119/kafka-producer-org-apache-kafka-common-serialization-stringserializer-could-no> >> Which talks about class loader. (I tried there solution, but it did not help) >> I looked at the loading and I see that the pair of these 2 classes is loaded >> from my uber jar, but twice. >> >> Have you guys seen this error before? >> Any suggestion? >> >> Boris Lublinsky >> FDP Architect >> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> >> https://www.lightbend.com/ <https://www.lightbend.com/> >>> On Feb 19, 2019, at 4:50 AM, Konstantin Knauf <konstan...@ververica.com >>> <mailto:konstan...@ververica.com>> wrote: >>> >>> Hi Boris, >>> >>> without looking at the entrypoint in much detail, generally there should >>> not be a race condition there: >>> >>> * if the taskmanagers can not connect to the resourcemanager they will >>> retry (per default the timeout is 5 mins) >>> * if the JobManager does not get enough resources from the ResourceManager >>> it will also wait for the resources/slots to provided. The timeout there is >>> also 5 minutes, I think. >>> >>> So, this should actually be pretty robust as long as the Taskmanager >>> containers can reach the Jobmanager eventually. >>> >>> Could you provide the Taskmanager/JobManager logs for such a failure case? >>> >>> Cheers, >>> >>> Konstantin >>> >>> >>> On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky >>> <boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>> >>> wrote: >>> Following >>> https://github.com/apache/flink/tree/release-1.7/flink-container/docker >>> <https://github.com/apache/flink/tree/release-1.7/flink-container/docker> >>> I have created an entry point, which looks like follows: >>> #!/bin/sh >>> >>> ################################################################################ >>> # from >>> https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh >>> >>> <https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh> >>> # and >>> https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh >>> >>> <https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh> >>> ################################################################################ >>> >>> # If unspecified, the hostname of the container is taken as the JobManager >>> address >>> JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} >>> >>> drop_privs_cmd() { >>> if [ $(id -u) != 0 ]; then >>> # Don't need to drop privs if EUID != 0 >>> return >>> elif [ -x /sbin/su-exec ]; then >>> # Alpine >>> echo su-exec flink >>> else >>> # Others >>> echo gosu flink >>> fi >>> } >>> >>> JOB_MANAGER="jobmanager" >>> TASK_MANAGER="taskmanager" >>> >>> CMD="$1" >>> shift >>> >>> if [ "${CMD}" = "help" ]; then >>> echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)" >>> exit 0 >>> elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then >>> if [ "${CMD}" = "${TASK_MANAGER}" ]; then >>> >>> TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep >>> -c ^processor /proc/cpuinfo)} >>> >>> sed -i -e "s/jobmanager.rpc.address: >>> localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" >>> "$FLINK_HOME/conf/flink-conf.yaml" >>> sed -i -e "s/taskmanager.numberOfTaskSlots: >>> 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" >>> "$FLINK_HOME/conf/flink-conf.yaml" >>> echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml" >>> echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml" >>> >>> echo "Starting Task Manager" >>> echo "config file: " && grep '^[^\n#]' >>> "$FLINK_HOME/conf/flink-conf.yaml" >>> exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" >>> start-foreground >>> else >>> sed -i -e "s/jobmanager.rpc.address: >>> localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" >>> "$FLINK_HOME/conf/flink-conf.yaml" >>> echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml" >>> echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml" >>> echo "config file: " && grep '^[^\n#]' >>> "$FLINK_HOME/conf/flink-conf.yaml" >>> >>> if [ -z "$1" ]; then >>> exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" >>> start-foreground "$@" >>> else >>> exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@" >>> fi >>> fi >>> fi >>> >>> exec "$@" >>> It does work for all the cases, except running standalone job. >>> The problem, the way I understand it, is a racing condition. >>> In kubernetes it takes several attempts for establish connection between >>> Job and Task manager, while standalone-job.sh >>> tries to start a job immediately once the cluster is created (before >>> connection is established). >>> Is there a better option to implement it starting a job on container >>> startup? >>> >>> >>> >>> -- >>> Konstantin Knauf | Solutions Architect >>> +49 160 91394525 > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com <http://www.scaleunlimited.com/> > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra