Re: Flink 1.9.1 allocating more containers than needed
Thank you for quick reply. Will wait for 1.9.2 then. I believe you dont have any estimates on when it can happen? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Flink 1.9.1 allocating more containers than needed
Hello, recently we have upgraded our environment to from 1.6.4 to 1.9.1. We started to notice similar behaviour we met in 1.6.2, which was allocating more containers on yarn then are needed by job - i think it was fixed by https://issues.apache.org/jira/browse/FLINK-10848, but that one is still existing in 1.9.1. Any chance to have that fixed? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: JobManager container is running beyond physical memory limits
we dont set it up anywhere so i guess its default 16. Do you think its too much? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: JobManager container is running beyond physical memory limits
anyone? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
JobManager container is running beyond physical memory limits
Hello, after switching from 1.4.2. to 1.5.2 we started to have problems with JM container. Our use case is as follows: - we get request from user - run DataProcessing job - once finished we store details to DB We have ~1000 jobs per day. After version update our container is dying after ~1-2 days. Previously it was running weeks without a problem. we reduced our *web.history* from 100 to 32, but that didnt help a lot. Do you have any suggestions what we could do? JM has 4GB memory assigned. We will test today with increasing it to 5 or 6GB but i have a feeling that it will only delay moment of crash -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: The program didn't contain a Flink job
Yes - it seems that main method returns success but for some reason we have that exception thrown. For now we applied workaround to catch exception and just skip it (later on our statusUpdater is reading statuses from FlinkDashboard). -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: The program didn't contain a Flink job
Yes - we are submitting jobs one by one. How can we change that to work for our needs? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: The program didn't contain a Flink job
We are running same job all the time. And that error is happening from time to time. Here is job submittion code: private JobSubmissionResult submitProgramToCluster(PackagedProgram packagedProgram) throws JobSubmitterException, ProgramMissingJobException, ProgramInvocationException { ClusterClient clusterClient = clusterClientUtil.getPrototypeClusterClient(); int parallelism = Integer.parseInt(serverConfiguration.envParallelism); return clusterClient.run(packagedProgram, parallelism); } And here our util for retrieving ClusterClient. public class ClusterClientUtil { ... public ClusterClient getPrototypeClusterClient() throws JobSubmitterException { return createClusterClientInstance(); } private synchronized ClusterClient createClusterClientInstance() throws JobSubmitterException { try { LOG.info("Creating new ClusterClient instance."); Configuration configuration = flinkConfigurator.getFlinkConfiguration(); ApplicationId applicationId = ConverterUtils.toApplicationId(configuration.getString(FlinkConfigurator.PROPERTY_FLINK_APP_ID, "")); logger.debug("Retrieved Flink applicationId: {}", applicationId.toString()); YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(configuration, serverConfiguration.getFlinkConfigurationDirectory); ApplicationReport applicationReport = yarnClient.getApplicationReport(applicationId); final int numberTaskManagers = configuration.getInteger(FlinkConfigurator.PROPERTY_FLINK_NUMBER_TASK_MANAGERS, 0); final int slotsPerTaskManager = configuration.getInteger(FlinkConfigurator.PROPERTY_FLINK_SLOTS_PER_TASK_MANAGER, -1); return new YarnClusterClient(yarnClusterDescriptor, numberTaskManagers, slotsPerTaskManager, yarnClient, applicationReport, configuration, false); } catch (Exception e) { throw new JobSubmitterException("Unable to create YarnClusterClient.", e); } } What Yarn settings to you need? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: The program didn't contain a Flink job
No. execute was called, and all calculation succeeded - there were job on dashboard with status FINISHED. after execute we had our logs that were claiming that everything succeded. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
The program didn't contain a Flink job
Hello,We are currently running jobs on Flink 1.4.2. Our usecase is as follows: -service get request from customer - we submit job to flink using YarnClusterClient Sometimes we have up to 6 jobs at the same time. >From time to time we got error as below: The program didn't contain a Flink job. org.apache.flink.client.program.ProgramMissingJobException: The program didn't contain a Flink job. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:398) >From logs we can see that main method from job is returning correct status, but for some reason later Flink throws that exception anyway. Do you know what could be a case here and how to prevent it from happening? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Web history limit in flink 1.5
thank you, I had to miss that option somehow :) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Web history limit in flink 1.5
Hello, we were playing around with flink 1.5 - so far so good. Only thing that we are missing is web history setup. In flink 1.4 and before we were using *web.history* config to hold 100 jobs. With Flink 1.5. we can see that history is limited to 1 hour only. Is it possible to somehow extend/configure that value? It is crutial for us to have it bigger. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Classloading issues after changing to 1.4
Hello, I still have problem after upgrading from flink 1.3.1 to 1.4.2 Our scenario looks like that: we have container running on top of yarn. Machine that starts it has installed flink and also loading some classpath libraries (e.g. hadoop) into container. there is seperate rest service that gets requests to run export job - it uses YarnClusterClient and submitting Packaged program. Jars with actual flink jobs are located in lib/ directory of service. On the machine where Spring service is deployed we don't have flink installed. For version 1.3 we had some libraries also loaded to container so that they wont have to be loaded dynamically every time. If I understand it correctly in strategy child-first it should not be needed any more, right? Now our actual problems started to come up linked with class loading. After restarting rest service first trigger of job is working fine, but next ones are complaining on class versions that are loaded. We found out that PackagedProgram is creating new classLoader on every creation. So we overriden that behaviour so that we have static map holding one classloader per jarFileName: /private static final Map classLoaders = Maps.newHashMap(); ... (constructor) { ... classLoaders.computeIfAbsent(jarFile.getName(), s -> getUserClassLoaderChildFirst(getAllLibraries(), classPaths, getClass().getClassLoader())); userCodeClassLoader = classLoaders.get(jarFile.getName()); this.mainClass = loadMainClass(entryPointClassName, userCodeClassLoader); } / I don't know if that is a good direction, but seems to solve an issue for now. We are just not sure about stability of this solution - still tesing on our internal environment but I'm affraid for now to proceed on production. Can you give us any other things we could try out to deal with loading? Also PackagedProgram is still using parentFirst strategy, in JobWithJars you have method: / public static ClassLoader buildUserCodeClassLoader(List jars, List classpaths, ClassLoader parent) { URL[] urls = new URL[jars.size() + classpaths.size()]; for (int i = 0; i < jars.size(); i++) { urls[i] = jars.get(i); } for (int i = 0; i < classpaths.size(); i++) { urls[i + jars.size()] = classpaths.get(i); } return FlinkUserCodeClassLoaders.parentFirst(urls, parent); } / is that correct to still point to parent? In some of issues I found in mailing list, you suggest to set up container to parent-first as a solving issue. We would like to find proper solution working on supported child-first path and don't use workaround fix. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Deserializing the InputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@50bb10fd) failed: unread block data after upgrade to 1.4.2
Thanks a lot. It seems to work. What is now the default classloader's order? To keep it working in new version how should I inject Hadoop dependencies so that they are read properly? The class that is missing (HadoopInputFormat) is from hadoop-compatibility library. I have upgraded it to version 1.4.2 as everything else. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Deserializing the InputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@50bb10fd) failed: unread block data after upgrade to 1.4.2
we were jumping from version 1.3.1 (where everything worked fine) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Deserializing the InputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@50bb10fd) failed: unread block data after upgrade to 1.4.2
Hello, We have recently upgraded flink to version 1.4.2. Now our jobs that rely on Parquet/Avro files located on HDFS stopped working. I get exception: Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'CHAIN DataSource (READING_RECORDS) -> Map (MAPPING_RECORDS)': Deserializing the InputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@50bb10fd) failed: unread block data at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:168) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1277) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:447) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: Deserializing the InputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@50bb10fd) failed: unread block data at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:66) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:164) ... 24 common frames omitted Caused by: java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2740) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:437) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:424) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:412) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:373) at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:63) ... 25 common frames omitted Im some other topic i have read about problem with primitives, but I dont know if that is something similar to serializing of HadoopInputFormat. Do you have any info what could be wrong? We have new flink-hadoop-compatibility version also upgraded in flink classpath. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Submiting jobs via UI/Rest API
Hi guys, We were trying to use UI's "Submit new job" functionality (and later REST endpoints for that). There were few problems we found: 1. When we ran job that had additional code done after env execution (or any sink) the code was not executed. E.g. our job was calculating some data, writing it to temp location in sink and when everything was successfully, move files to proper location on HDFS. Running job using Java's YARNClusterClient API worked fine. 2. We wanted to test job using "Show Plan" option but it seems that running this option for job that did not have anything to run (e.g. calculated input paths list was empty) results in killing the container on YARN. I didnt find any suspicious logs in jobManager: /2018-03-09 14:13:53,979 INFO com.my_job.CustomFlinkJob - All job done :) 2018-03-09 14:13:53,996 INFO org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Removing web dashboard root cache directory /tmp/flink-web-1fe30b99-9ad1-4531-b14b-143ea6c3d9ed 2018-03-09 14:13:54,004 INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:60727 2018-03-09 14:13:54,007 INFO org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Removing web dashboard jar upload directory /tmp/flink-web-8d7b68fc-1ef7-4869-91c1-5bebb370b529 / We are using Flink 1.3.1 version, next week will play with 1.4.1. Any chances for fixing that bugs in next versions? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: hadoopcompatibility not in dist
bumping up that issue, as i have similar problem now. We are running flink on Yarn and trying to submit job via java api using YarnClusterClient (run method with PackagedProgram). Job starts to execute (we can see it on Dashboard) but fails with error: Caused by: java.lang.RuntimeException: Could not load the TypeInformation for the class 'org.apache.hadoop.io.Writable'. You may be missing the 'flink-hadoop-compatibility' dependency. at org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2143) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1774) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1716) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:953) at org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1173) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:886) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:966) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:828) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:622) at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:188) at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:266) when i run the same job from command line on machine where flink is installed, job is running fine ( we had previously same error, but adding jar to ./lib/ directory solved the issue). -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Additional data read inside dataset transformations
Hello, I will describe my use case shortly with steps for easier understanding: 1) currently my job is loading data from parquet files using HadoopInputFormat along with AvroParquetInputFormat, with current approach: AvroParquetInputFormat inputFormat = new AvroParquetInputFormat(); AvroParquetInputFormat.setAvroReadSchema(job, schema); AvroParquetInputFormat.setUnboundRecordFilter(job, recordFilterClass); HadoopInputFormat hadoopInputFormat = HadoopInputs.createHadoopInput(inputFormat, Void.class, GenericRecord.class, job); return environment.createInput(hadoopInputFormat); 2) data is loaded into DataSource and after various transformations is grouped by my "user_id" key, 3) in GroupReduceFunction I am dealing with values for given user, 4) for each group in reduce function I am extracting the key (which has been used for earlier grouping) and would like to read additional data (parquet files from HDFS for specific key extracted before), which are required for further grouped data processing 5) after processing inside reduce function, I would like to store results in parquet files using AvroParquerWriter class. My question is how additional data loading inside reduce function (or any other transformation) can be achieved in step number 4). In my perfect scenario I would like to use HadoopInputFormat (just like for loading initial data in first step), however I am missing environment context here (probably?). Is there any way to achieve this or this scenarios is completely wrong and therefore badly designed? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/