Re: Reading Data from zip/gzip
Hi Chris, FileInputFormat automatically takes cares of file decompression for the files with gzip, xz, bz2 and deflate extensions. -- Thanks, Amit Source: https://github.com/apache/flink/blob/7b040b915504e59243c642b1f4a84c956d96d134/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L118 private static void initDefaultInflaterInputStreamFactories() { InflaterInputStreamFactory[] defaultFactories = { DeflateInflaterInputStreamFactory.getInstance(), GzipInflaterInputStreamFactory.getInstance(), Bzip2InputStreamFactory.getInstance(), XZInputStreamFactory.getInstance(), }; for (InflaterInputStreamFactory inputStreamFactory : defaultFactories) { for (String fileExtension : inputStreamFactory.getCommonFileExtensions()) { registerInflaterInputStreamFactory(fileExtension, inputStreamFactory); } } } On Mon, Oct 22, 2018 at 2:03 PM chrisr123 wrote: > I'm able to read normal txt or csv files using Flink, > but what would I need to do in order to read them if they > are given to me in zip or gzip format? Assuming I do not want > to have to unzip them. > Thanks! > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
Re: Why am I getting AWS access denied error for request type [DeleteObjectRequest] in S3?
Hi Harshith, Did you enable delete permission on S3 for running machines? Are you using IAM roles or access key id and secret access key combo? -- Thanks, Amit On Mon, Oct 15, 2018 at 3:15 PM Kumar Bolar, Harshith wrote: > Hi all, > > > > We store Flink checkpoints in Amazon S3. Flink periodically sends out GET, > PUT, LIST, DELETE requests to S3, to store-clear checkpoints. From the > logs, we see that GET, PUT and LIST requests are successful but it throws > an AWS access denied error for DELETE request. > > > > Here’s a snippet of the logs for DELETE request – > > > > 2018-10-15 04:13:22,819 INFO > org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - > ServiceName=[Amazon S3], AWSErrorCode=[AccessDenied], StatusCode=[403], > ServiceEndpoint=[https://xxx-xxx-prod.s3.amazonaws.com], > Exception=[org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: > Access Denied (Service: Amazon S3; *Status Code: 403; Error Code: > AccessDenied;* Request ID: x), S3 Extended Request ID: > xxx], > *RequestType=[DeleteObjectRequest]*, AWSRequestID=[XX], > HttpClientPoolPendingCount=0, RetryCapacityConsumed=0, > HttpClientPoolAvailableCount=1, RequestCount=1, Exception=1, > HttpClientPoolLeasedCount=0, ClientExecuteTime=[4.984], > HttpClientSendRequestTime=[0.029], HttpRequestTime=[4.84], > RequestSigningTime=[0.038], CredentialsRequestTime=[0.0, 0.0], > HttpClientReceiveResponseTime=[4.78] > > > > Is there some configuration that we’re forgetting that is preventing Flink > from sending DELETE requests to S3? > > > > I’d be happy to provide more information if needed. > > > > Thanks, > > Harshith > > > > >
Re: Flink1.6 Confirm that flink supports the plan function?
Hi, 2) You may also want to look into ParameterTool[1] class to parse and read passed properties file [2]. -- Thanks, Amit [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/java/utils/ParameterTool.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/best_practices.html#getting-your-configuration-values-into-the-parametertool On Mon, Oct 15, 2018 at 1:28 PM Till Rohrmann wrote: > Hi, > > 1) you currently cannot merge multiple jobs into one after they have been > submitted. What you can do though, is to combine multiple jobs in your > Flink program before you submit it. > > 2) you can pass program arguments when you submit your job. After it > has been submitted, it is no longer possible to change the command line > arguments. > > Cheers, > Till > > On Mon, Oct 15, 2018 at 9:11 AM wangziyu <2375900...@qq.com> wrote: > >> Dear Friend: >> Now ,I am a learn flink for 20 days.I would to trouble >> friends >> to help solve two problems. >> Questions are as follows: >> 1. If I have some jobs,How can I merge the some jobs to One >> that convenient for me to manage? >> I have look for some restful api in >> " >> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html >> "。I >> see "/jars/:jarid/plan" it seem say "Returns the dataflow plan of a job >> contained in a jar previously uploaded via '/jars/upload'."I think it is >> not >> my purpose. >> 2.When I run a job,I need pass in several parameters.For >> example "./flink run -d -c streaming.Kafka010NumCountConsumer >> /ziyu/flink/kafkaFlink-1.0-SNAPSHOT.jar h1 /ziyu/h1.txt" .Now If I have >> know >> JobId,Can I get the job pass in several parameters by java.I think it is >> has some interface can use,But I can't get it. >> That is all.Can you help me that give me some >> information.Thanks so mach. >> >> >> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >
Re: Flink streaming-job with redis sink: SocketTimeoutException
Hi Marke, Stacktrace suggests it is more of a Redis connection issue rather than something with Flink. Could you share JedisPool configuration of Redis sink? Are you writing into Redis in continuity or some bulk logic? Looks like Redis connections are getting timeout here. -- Thanks, Amit On Mon, Oct 15, 2018 at 12:19 PM Marke Builder wrote: > Hi, > > what can be the reasons for the following exceptions. > We are using flink with a redis sink, but from time to time the flink job > failed with the follwing excpetions. > > Thanks, Builder. > > > 10/13/2018 15:37:48 Flat Map -> (Sink: Unnamed, Sink: Unnamed)(9/10) > switched to FAILED > redis.clients.jedis.exceptions.JedisConnectionException: > java.net.SocketTimeoutException: Read timed out > at > redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:201) > at > redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40) > at redis.clients.jedis.Protocol.process(Protocol.java:141) > at redis.clients.jedis.Protocol.read(Protocol.java:205) > at > redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:297) > at > redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:196) > at redis.clients.jedis.Jedis.set(Jedis.java:69) > at > org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.set(RedisContainer.java:178) > at > org.apache.flink.streaming.connectors.redis.RedisSink.invoke(RedisSink.java:143) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > com.voith.cloud.cache.operators.FlatMapMeasurements.flatMap(FlatMapMeasurements.java:27) > at > com.voith.cloud.cache.operators.FlatMapMeasurements.flatMap(FlatMapMeasurements.java:12) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.net.SocketTimeoutException: Read timed out > at java.net.SocketInputStream.socketRead0(Native Method) > at > java.net.SocketInputStream.socketRead(SocketInputStream.java:116) > at java.net.SocketInputStream.read(SocketInputStream.java:171) > at java.net.SocketInputStream.read(SocketInputStream.java:141) > at java.net.SocketInputStream.read(SocketInputStream.java:127) > at > redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:195) > ... 26 more > >
Re: DatabaseClient at async example
Hi Nicos, DatabaseClient is an example class to describe the asyncio concept. There is no interface/class for this client in Flink codebase. You can use any mariaDB client implementation which supports concurrent request to DB. -- Cheers, Amit On Wed, Oct 3, 2018 at 8:14 PM Nicos Maris wrote: > Hello, > > I have a short question about the following example in your documentation. > > > https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/asyncio.html > > > Which is the package and the maven dependency of the class DatabaseClient? > > I am building a Proof of Concept based on the above documentation and > mariaDB. >
Re: BucketingSink to S3: Missing class com/amazonaws/AmazonClientException
Hi Julio, What's the Flink version for this setup? -- Thanks, Amit On Wed, Oct 3, 2018 at 4:22 PM Andrey Zagrebin wrote: > Hi Julio, > > Looks like some problem with dependencies. > Have you followed the recommended s3 configuration guide [1]? > Is it correct that your job already created checkpoints/savepoints on s3 > before? > > I think if you manually create file system using FileSystem.get(path), it > should be configured the same way as for bucketing sink and checkpoints. > > Best, > Andrey > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/aws.html#s3-simple-storage-service > > On 2 Oct 2018, at 15:21, Julio Biason wrote: > > Hey guys, > > I've setup a BucketingSink as a dead letter queue into our Ceph cluster using > S3, but when I start the job, I get this error: > > > java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306) > at > org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271) > at > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367) > at > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassNotFoundException: > com.amazonaws.AmazonClientException > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 17 more > > I find it weird 'cause I've already set up checkpoints (and savepoitns) to > use S3 as protocol, and I just assume that, if it works for checkpoints, it > should work here. > > (I suppose I could add the aws client as a dependency of my build but, > again, I assumed that once S3 works for checkpoints, it should work > everywhere.) > > And kinda related, can I assume that using the FileSystem class to create > FSOutputStreams will follow Flink configuration? I have another type of > dead letter queue that won't work with BucketingSink and I was thinking > about using it directly to create files inside that Ceph/S3. > > -- > *Julio Biason*, Sofware Engineer > *AZION* | Deliver. Accelerate. Protect. > Office: +55 51 3083 8101 | Mobile: +55 51 > *99907 0554* > > >
Re: Exception while submitting jobs through Yarn
Hi Gravit, I think Till is interested to know about classpath details present at the start of JM and TM logs e.g. following logs provide classpath details used by TM in our case. 2018-06-17 19:01:30,656 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - 2018-06-17 19:01:30,658 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - Starting YARN TaskExecutor runner (Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC) 2018-06-17 19:01:30,659 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - OS current user: yarn 2018-06-17 19:01:31,662 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - Current Hadoop/Kerberos user: hadoop 2018-06-17 19:01:31,663 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b10 2018-06-17 19:01:31,663 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - Maximum heap size: 6647 MiBytes 2018-06-17 19:01:31,663 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - JAVA_HOME: /usr/lib/jvm/java-openjdk 2018-06-17 19:01:31,664 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - Hadoop version: 2.8.3 2018-06-17 19:01:31,664 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - JVM Options: 2018-06-17 19:01:31,665 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - -Xms6936m 2018-06-17 19:01:31,665 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - -Xmx6936m 2018-06-17 19:01:31,665 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - -XX:MaxDirectMemorySize=4072m 2018-06-17 19:01:31,665 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - -Dlog.file=/var/log/hadoop-yarn/containers/application_1528342246614_0002/container_1528342246614_0002_01_282649/taskmanager.log 2018-06-17 19:01:31,665 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - -Dlogback.configurationFile=file:./logback.xml 2018-06-17 19:01:31,665 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - -Dlog4j.configuration=file:./log4j.properties 2018-06-17 19:01:31,665 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - Program Arguments: 2018-06-17 19:01:31,665 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - --configDir 2018-06-17 19:01:31,665 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - . *2018-06-17 19:01:31,666 INFO org.apache.flink.yarn.YarnTaskExecutorRunner - Classpath: lib/flink-dist_2.11-1.5.0.jar:lib/flink-python_2.11-1.5.0.jar:lib/flink-shaded-hadoop2-uber-1.5.0.jar:lib/flink-shaded-include-yarn-0.9.1.jar:lib/guava-18.0.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.7.jar:log4j.properties:logback.xml:flink.jar:flink-conf.yaml::/etc/hadoop/conf:/usr/lib/hadoop/hadoop-common-2.8.3-amzn-0.jar:/usr/lib/hadoop/hadoop-archive-logs.jar:/usr/lib/hadoop/hadoop-auth.jar:/usr/lib/hadoop/hadoop-archives-2.8.3-amzn-0.jar:/usr/lib/hadoop/hadoop-archive-logs-2.8.3-amzn-0.jar:/usr/lib/hadoop/hadoop-azure-datalake-2.8.3-amzn-0.jar.* -- Thanks, Amit On Mon, Jun 18, 2018 at 2:00 PM, Garvit Sharma wrote: > Hi, > > Please refer to my previous mail for complete logs. > > Thanks, > > On Mon, Jun 18, 2018 at 1:17 PM Till Rohrmann > wrote: > >> Could you also please share the complete log file with us. >> >> Cheers, >> Till >> >> On Sat, Jun 16, 2018 at 5:22 PM Ted Yu wrote: >> >>> The error for core-default.xml is interesting. >>> >>> Flink doesn't have this file. Probably it came with Yarn. Please check >>> the hadoop version Flink was built with versus the hadoop version in your >>> cluster. >>> >>> Thanks >>> >>> Original message >>> From: Garvit Sharma >>> Date: 6/16/18 7:23 AM (GMT-08:00) >>> To: trohrm...@apache.org >>> Cc: Chesnay Schepler , user@flink.apache.org >>> Subject: Re: Exception while submitting jobs through Yarn >>> >>> I am not able to figure out, got stuck badly in this since last 1 week. >>> Any little help would be appreciated. >>> >>> >>> 2018-06-16 19:25:10,523 DEBUG org.apache.flink.streaming.api.graph. >>> StreamingJobGraphGenerator - Parallelism set: 1 for 8 >>> >>> 2018-06-16 19:25:10,578 DEBUG org.apache.flink.streaming.api.graph. >>> StreamingJobGraphGenerator - Parallelism set: 1 for 1 >>> >>> 2018-06-16 19:25:10,588 DEBUG org.apache.flink.streaming.api.graph. >>> StreamingJobGraphGenerator - CONNECTED: KeyGroupStreamPartitioner - 1 >>> -> 8 >>> >>> 2018-06-16 19:25:10,591 DEBUG org.apache.flink.streaming.api.graph. >>> StreamingJobGraphGenerator - Parallelism set: 1 for 5 >>> >>> 2018-06-16 19:25:10,597 DEBUG org.apache.flink.streaming.api.graph. >>> StreamingJobGraphGenerator - CONNECTED: KeyGroupStreamPartitioner - 5 >>> -> 8 >>> >>> 2018-06-1
Re: Implementing a “join” between a DataStream and a “set of rules”
Hi Sandybayev, In the current state, Flink does not provide a solution to the mentioned use case. However, there is open FLIP[1] [2] which has been created to address the same. I can see in your current approach, you are not able to update the rule set data. I think you can update rule set data by building DataStream around changelogs which are stored in message queue/distributed file system. OR You can store rule set data in the external system where you can query for incoming keys from Flink. [1]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API [2]: https://issues.apache.org/jira/browse/FLINK-6131 On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta) wrote: > Hi, > > > > What is the best practice recommendation for the following use case? We need > to match a stream against a set of “rules”, which are essentially a Flink > DataSet concept. Updates to this “rules set" are possible but not frequent. > Each stream event must be checked against all the records in “rules set”, > and each match produces one or more events into a sink. Number of records in > a rule set are in the 6 digit range. > > > > Currently we're simply loading rules into a local List of rules and using > flatMap over an incoming DataStream. Inside flatMap, we're just iterating > over a list comparing each event to each rule. > > > > To speed up the iteration, we can also split the list into several batches, > essentially creating a list of lists, and creating a separate thread to > iterate over each sub-list (using Futures in either Java or Scala). > > > > Questions: > > 1.Is there a better way to do this kind of a join? > > 2.If not, is it safe to add additional parallelism by creating > new threads inside each flatMap operation, on top of what Flink is already > doing? > > > > Thanks in advance! > > Turar > >
Re: Flink 1.5, failed to instantiate S3 FS
Hi Hao, Have look over https://issues.apache.org/jira/browse/HADOOP-13811?focusedCommentId=15703276&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15703276 What version of Hadoop are you using? Could you provide classpath used by Flink Job Manager, it is present in jobmanager.log file. -- Cheers, Amit
Re: Batch job stuck in Canceled state in Flink 1.5
Thanks Till. `taskmanager.network.request-backoff.max` option helped in my case. We tried this on 1.5.0 and jobs are running fine. -- Thanks Amit On Thu 24 May, 2018, 4:58 PM Amit Jain, wrote: > Thanks! Till. I'll give a try on your suggestions and update the thread. > > On Wed, May 23, 2018 at 4:43 AM, Till Rohrmann > wrote: > > Hi Amit, > > > > it looks as if the current cancellation cause is not the same as the > > initially reported cancellation cause. In the current case, it looks as > if > > the deployment of your tasks takes so long that that maximum > > `taskmanager.network.request-backoff.max` value has been reached. When > this > > happens a task gives up asking for the input result partition and fails > with > > the `PartitionNotFoundException`. > > > > More concretely, the `CHAIN Reduce (GroupReduce at > > first(SortedGrouping.java:210)) -> Map (Key Extractor) (2/14)` cannot > > retrieve the result partition of the `CHAIN DataSource (at > > createInput(ExecutionEnvironment.java:548) > > (org.apache.flink.api.java.io.TextInputFormat)) -> Map (Data Source > > org.apache.flink.api.java.io.TextInputFormat > > > [s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/redshift_logs/2018/5/20/14/, > > > s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/redshift_logs/2018/5/20/15/0/]) > > -> Map (Key Extractor) -> Combine (GroupReduce at > > first(SortedGrouping.java:210)) (8/14)` task. This tasks is in the state > > deploying when the exception occurs. It seems to me that this task takes > > quite some time to be deployed. > > > > One reason why the deployment could take some time is that an UDF (for > > example the closure) of one of the operators is quite large. If this is > the > > case, then Flink offloads the corresponding data onto the BlobServer from > > where they are retrieved by the TaskManagers. Since you are running in > > non-HA mode, the BlobServer won't store the blobs on HDFS from where they > > could be retrieved. Instead all the TaskManagers ask the single > BlobServer > > for the required TDD blobs. Depending on the size of the TDDs, the > > BlobServer might become the bottleneck. > > > > What you can try to do is the following > > 1) Try to reduce the closure object of the UDFs in the above-mentioned > task. > > 2) Increase `taskmanager.network.request-backoff.max` to give the system > > more time to download the blobs > > 3) Run the cluster in HA mode which will store the blobs also under > > `high-availability.storageDir` (usually HDFS or S3). Before downloading > the > > blobs from the BlobServer, Flink will first try to download them from the > > `high-availability-storageDir` > > > > Let me know if this solves your problem. > > > > Cheers, > > Till > > > > On Tue, May 22, 2018 at 1:29 PM, Amit Jain wrote: > >> > >> Hi Nico, > >> > >> Please find the attachment for more logs. > >> > >> -- > >> Thanks, > >> Amit > >> > >> On Tue, May 22, 2018 at 4:09 PM, Nico Kruber > >> wrote: > >> > Hi Amit, > >> > thanks for providing the logs, I'll look into it. We currently have a > >> > suspicion of this being caused by > >> > https://issues.apache.org/jira/browse/FLINK-9406 which we found by > >> > looking over the surrounding code. The RC4 has been cancelled since we > >> > see this as a release blocker. > >> > > >> > To rule out further errors, can you also provide logs for the task > >> > manager producing partitions d6946b39439f10e8189322becf1b8887, > >> > 9aa35dd53561f9220b7b1bad84309d5f and 60909dec9134eb980e18064358dc2c81? > >> > The task manager log you provided covers the task manager asking for > >> > this partition only for which the job manager produces the > >> > PartitionProducerDisposedException that you see. > >> > I'm looking for the logs of task managers with the following execution > >> > IDs in their logs: > >> > - 2826f9d430e05e9defaa46f60292fa79 > >> > - 7ef992a067881a07409819e3aea32004 > >> > - ec923ce6d891d89cf6fecb5e3f5b7cc5 > >> > > >> > Regarding the operators being stuck: I'll have a further look into the > >> > logs and state transition and will come back to you. > >> > > >> > > >> > Nico > >> > > >> > > >> > On 21/05/18 09:27, Amit Jain wrote: > >> >> Hi All, >
Re: Batch job stuck in Canceled state in Flink 1.5
Thanks! Till. I'll give a try on your suggestions and update the thread. On Wed, May 23, 2018 at 4:43 AM, Till Rohrmann wrote: > Hi Amit, > > it looks as if the current cancellation cause is not the same as the > initially reported cancellation cause. In the current case, it looks as if > the deployment of your tasks takes so long that that maximum > `taskmanager.network.request-backoff.max` value has been reached. When this > happens a task gives up asking for the input result partition and fails with > the `PartitionNotFoundException`. > > More concretely, the `CHAIN Reduce (GroupReduce at > first(SortedGrouping.java:210)) -> Map (Key Extractor) (2/14)` cannot > retrieve the result partition of the `CHAIN DataSource (at > createInput(ExecutionEnvironment.java:548) > (org.apache.flink.api.java.io.TextInputFormat)) -> Map (Data Source > org.apache.flink.api.java.io.TextInputFormat > [s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/redshift_logs/2018/5/20/14/, > s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/redshift_logs/2018/5/20/15/0/]) > -> Map (Key Extractor) -> Combine (GroupReduce at > first(SortedGrouping.java:210)) (8/14)` task. This tasks is in the state > deploying when the exception occurs. It seems to me that this task takes > quite some time to be deployed. > > One reason why the deployment could take some time is that an UDF (for > example the closure) of one of the operators is quite large. If this is the > case, then Flink offloads the corresponding data onto the BlobServer from > where they are retrieved by the TaskManagers. Since you are running in > non-HA mode, the BlobServer won't store the blobs on HDFS from where they > could be retrieved. Instead all the TaskManagers ask the single BlobServer > for the required TDD blobs. Depending on the size of the TDDs, the > BlobServer might become the bottleneck. > > What you can try to do is the following > 1) Try to reduce the closure object of the UDFs in the above-mentioned task. > 2) Increase `taskmanager.network.request-backoff.max` to give the system > more time to download the blobs > 3) Run the cluster in HA mode which will store the blobs also under > `high-availability.storageDir` (usually HDFS or S3). Before downloading the > blobs from the BlobServer, Flink will first try to download them from the > `high-availability-storageDir` > > Let me know if this solves your problem. > > Cheers, > Till > > On Tue, May 22, 2018 at 1:29 PM, Amit Jain wrote: >> >> Hi Nico, >> >> Please find the attachment for more logs. >> >> -- >> Thanks, >> Amit >> >> On Tue, May 22, 2018 at 4:09 PM, Nico Kruber >> wrote: >> > Hi Amit, >> > thanks for providing the logs, I'll look into it. We currently have a >> > suspicion of this being caused by >> > https://issues.apache.org/jira/browse/FLINK-9406 which we found by >> > looking over the surrounding code. The RC4 has been cancelled since we >> > see this as a release blocker. >> > >> > To rule out further errors, can you also provide logs for the task >> > manager producing partitions d6946b39439f10e8189322becf1b8887, >> > 9aa35dd53561f9220b7b1bad84309d5f and 60909dec9134eb980e18064358dc2c81? >> > The task manager log you provided covers the task manager asking for >> > this partition only for which the job manager produces the >> > PartitionProducerDisposedException that you see. >> > I'm looking for the logs of task managers with the following execution >> > IDs in their logs: >> > - 2826f9d430e05e9defaa46f60292fa79 >> > - 7ef992a067881a07409819e3aea32004 >> > - ec923ce6d891d89cf6fecb5e3f5b7cc5 >> > >> > Regarding the operators being stuck: I'll have a further look into the >> > logs and state transition and will come back to you. >> > >> > >> > Nico >> > >> > >> > On 21/05/18 09:27, Amit Jain wrote: >> >> Hi All, >> >> >> >> I totally missed this thread. I've encountered same issue in Flink >> >> 1.5.0 RC4. Please look over the attached logs of JM and impacted TM. >> >> >> >> Job ID 390a96eaae733f8e2f12fc6c49b26b8b >> >> >> >> -- >> >> Thanks, >> >> Amit >> >> >> >> On Thu, May 3, 2018 at 8:31 PM, Nico Kruber >> >> wrote: >> >>> Also, please have a look at the other TaskManagers' logs, in >> >>> particular >> >>> the one that is running the operator that was mentioned in the >> >>> excep
Re: Strange Behaviour with task manager oom ?
Hi, Could you share log of job and impacted task manager? How much memory you have allocated to the Job Manager? -- Thanks, Amit On Mon, May 21, 2018 at 8:46 PM, sohimankotia wrote: > Hi, > > I am running flink batch job . > > My job is running fine if i use 4 task manger and 8 slots = 32 parallelism > with 6GB memory per task manager. > > As soon I increase task mangers to 5 with 6 task per task manager = 30 > parallelism (6GB memory per task manager) > > I am getting oom error . I am not able to understand this strange behaviour > . > > Container: container_e60_1526906661225_0213_01_01 on > dh-cdh-m1d5.dbp.host1.in_8041 > > LogType:jobmanager.err > Log Upload Time:Mon May 21 19:48:12 +0530 2018 > LogLength:3247 > Log Contents: > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/mnt/vol9/yarn/nm/usercache/hdfs/appcache/application_1526906661225_0213/filecache/13/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/mnt/vol7/yarn/nm/usercache/hdfs/appcache/application_1526906661225_0213/filecache/11/process-runners-0.0.2-SNAPSHOT-shaded.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/opt/cloudera/parcels/CDH-5.5.1-1.cdh5.5.1.p0.11/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > Uncaught error from thread [flink-akka.remote.default-remote-dispatcher-5] > shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for > ActorSystem[flink] > java.lang.OutOfMemoryError: Java heap space > at java.util.Arrays.copyOf(Arrays.java:3236) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) > at > akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) > at > akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) > at > akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) > at > akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875) > at > akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874) > at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769) > at > akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > LogType:jobmanager.log > > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Checkpointing when reading from files?
Hi Alex, StreamingExecutionEnvironment#readFile is a helper function to create file reader data streaming source. It uses ContinuousFileReaderOperator and ContinuousFileMonitoringFunction internally. As both file reader operator and monitoring function uses checkpointing so is readFile [1], you can go with first approach. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readFile-org.apache.flink.api.common.io.FileInputFormat-java.lang.String-org.apache.flink.streaming.api.functions.source.FileProcessingMode-long-org.apache.flink.api.common.typeinfo.TypeInformation- -- Thanks, Amit On Mon, May 21, 2018 at 9:39 PM, NEKRASSOV, ALEXEI wrote: > I want to add checkpointing to my program that reads from a set of files in > a directory. Without checkpointing I use readFile(): > > > > DataStream text = env.readFile( > >new TextInputFormat(new Path(inputPath)), > >inputPath, > > inputProcessingMode, > > 1000); > > > > Should I use ContinuousFileMonitoringFunction / ContinuousFileReaderOperator > to add checkpointing? Or is there an easier way? > > > > How do I go from splits (that ContinuousFileMonitoringFunction provides) to > actual strings? I’m not clear how ContinuousFileReaderOperator can be used. > > > > DataStreamSource split = > env.addSource( > >new ContinuousFileMonitoringFunction( > > new TextInputFormat(new > Path(inputPath)), > > inputProcessingMode, > > 1, > > 1000) > > ); > > > > Thanks, > Alex
Re: Message guarantees with S3 Sink
Thanks Gary! Sure, there are issues with updates in S3. You may want to look over EMRFS guarantees of the consistent view [1]. I'm not sure, is it possible in non-EMR AWS system or not. I'm creating a JIRA issue regarding data loss possibility in S3. IMHO, Flink docs should mention about possible data loss in S3. [1] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html -- Thanks, Amit On Fri, May 18, 2018 at 2:48 AM, Gary Yao wrote: > Hi Amit, > > The BucketingSink doesn't have well defined semantics when used with S3. > Data > loss is possible but I am not sure whether it is the only problem. There are > plans to rewrite the BucketingSink in Flink 1.6 to enable eventually > consistent > file systems [1][2]. > > Best, > Gary > > > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/sink-with-BucketingSink-to-S3-files-override-td18433.html > [2] https://issues.apache.org/jira/browse/FLINK-6306 > > On Thu, May 17, 2018 at 11:57 AM, Amit Jain wrote: >> >> Hi, >> >> We are using Flink to process click stream data from Kafka and pushing >> the same in 128MB file in S3. >> >> What is the message processing guarantees with S3 sink? In my >> understanding, S3A client buffers the data on memory/disk. In failure >> scenario on particular node, TM would not trigger Writer#close hence >> buffered data can lose entirely assuming this buffer contains data of >> last successful checkpointing. >> >> -- >> Thanks, >> Amit > >
Re: Message guarantees with S3 Sink
Hi Rong, We are using BucketingSink only. I'm looking for the case where TM does not get the chance to call Writer#flush like YARN killed the TM because of OOM. We have configured fs.s3.impl to com.amazon.ws.emr.hadoop.fs.EmrFileSystem in core-site.xml, so BucketingSink is using S3 client internally. When we write data using S3A client, it buffers up the data in memory or disk until it hit multipart file size or call to close of OutputStream happens. Now suppose, S3A client buffers up 40MB data in TM's local disk and same time checkpoint barrier comes in at Sink and got successfully completed. Write process in sink resumes and now buffer data size reaches to 60MB and now YARN killed the TM. What would happen to original 40MB of data ? -- Thanks, Amit On Thu, May 17, 2018 at 10:28 PM, Rong Rong wrote: > Hi Amit, > > Can you elaborate how you write using "S3 sink" and which version of Flink > you are using? > > If you are using BucketingSink[1], you can checkout the API doc and > configure to flush before closing your sink. > This way your sink is "integrated with the checkpointing mechanism to > provide exactly once semantics"[2] > > Thanks, > Rong > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html > [2] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html > > On Thu, May 17, 2018 at 2:57 AM, Amit Jain wrote: >> >> Hi, >> >> We are using Flink to process click stream data from Kafka and pushing >> the same in 128MB file in S3. >> >> What is the message processing guarantees with S3 sink? In my >> understanding, S3A client buffers the data on memory/disk. In failure >> scenario on particular node, TM would not trigger Writer#close hence >> buffered data can lose entirely assuming this buffer contains data of >> last successful checkpointing. >> >> -- >> Thanks, >> Amit > >
Message guarantees with S3 Sink
Hi, We are using Flink to process click stream data from Kafka and pushing the same in 128MB file in S3. What is the message processing guarantees with S3 sink? In my understanding, S3A client buffers the data on memory/disk. In failure scenario on particular node, TM would not trigger Writer#close hence buffered data can lose entirely assuming this buffer contains data of last successful checkpointing. -- Thanks, Amit
Re: [Flink 1.5.0] BlobServer data for a job is not getting cleaned up at JM
Thanks Chesnay for the quick reply. I raised the ticket https://issues.apache.org/jira/browse/FLINK-9381. On Wed, May 16, 2018 at 5:33 PM, Chesnay Schepler wrote: > Please open a JIRA. > > > On 16.05.2018 13:58, Amit Jain wrote: >> >> Hi, >> >> We are running Flink 1.5.0 rc3 with YARN as cluster manager and found >> Job Manager is getting killed due to out of disk error. >> >> Upon further analysis, we found blob server data for a job is not >> getting cleaned up. Right now, we wrote directory cleanup script based >> on directory creation time of job_. >> >> Please suggest good way to deal with this problem. Can I raise a JIRA >> ticket for same? >> >> -- >> Thanks, >> Amit >> >
[Flink 1.5.0] BlobServer data for a job is not getting cleaned up at JM
Hi, We are running Flink 1.5.0 rc3 with YARN as cluster manager and found Job Manager is getting killed due to out of disk error. Upon further analysis, we found blob server data for a job is not getting cleaned up. Right now, we wrote directory cleanup script based on directory creation time of job_. Please suggest good way to deal with this problem. Can I raise a JIRA ticket for same? -- Thanks, Amit
Re: Question about datasource replication
Hi Flavio, Which version of Flink are you using? -- Thanks, Amit On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier wrote: > Hi all, > I've a Flink batch job that reads a parquet dataset and then applies 2 > flatMap to it (see pseudocode below). > The problem is that this dataset is quite big and Flink duplicates it before > sending the data to these 2 operators (I've guessed this from the doubling > amount of sent bytes) . > Is there a way to avoid this behaviour? > > --- > Here's the pseudo code of my job: > > DataSet X = readParquetDir(); > X1 = X.flatMap(...); > X2 = X.flatMap(...); > > Best, > Flavio
Re: Batch job stuck in Canceled state in Flink 1.5
Hi Stephan, Size of JM log file is 122 MB. Could you provide me other media to post the same? We can use Google Drive if that's fine with you. -- Thanks, Amit On Thu, May 3, 2018 at 12:58 PM, Stephan Ewen wrote: > Hi Amit! > > Thanks for sharing this, this looks like a regression with the network stack > changes. > > The log you shared from the TaskManager gives some hint, but that exception > alone should not be a problem. That exception can occur under a race between > deployment of some tasks while the whole job is entering a recovery phase > (maybe we should not print it so prominently to not confuse users). There > must be something else happening on the JobManager. Can you share the JM > logs as well? > > Thanks a lot, > Stephan > > > On Wed, May 2, 2018 at 12:21 PM, Amit Jain wrote: >> >> Thanks! Fabian >> >> I will try using the current release-1.5 branch and update this thread. >> >> -- >> Thanks, >> Amit >> >> On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske wrote: >> > Hi Amit, >> > >> > We recently fixed a bug in the network stack that affected batch jobs >> > (FLINK-9144). >> > The fix was added after your commit. >> > >> > Do you have a chance to build the current release-1.5 branch and check >> > if >> > the fix also resolves your problem? >> > >> > Otherwise it would be great if you could open a blocker issue for the >> > 1.5 >> > release to ensure that this is fixed. >> > >> > Thanks, >> > Fabian >> > >> > 2018-04-29 18:30 GMT+02:00 Amit Jain : >> >> >> >> Cluster is running on commit 2af481a >> >> >> >> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain wrote: >> >> > Hi, >> >> > >> >> > We are running numbers of batch jobs in Flink 1.5 cluster and few of >> >> > those >> >> > are getting stuck at random. These jobs having the following failure >> >> > after >> >> > which operator status changes to CANCELED and stuck to same. >> >> > >> >> > Please find complete TM's log at >> >> > https://gist.github.com/imamitjain/066d0e0ee24f2da1ddc83eba2012 >> >> > >> >> > >> >> > 2018-04-29 14:57:24,437 INFO >> >> > org.apache.flink.runtime.taskmanager.Task >> >> > - Producer 98f5976716234236dc69fb0e82a0cc34 of partition >> >> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution. >> >> > >> >> > org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException: >> >> > Execution 98f5976716234236dc69fb0e82a0cc34 producing partition >> >> > 5b8dd5ac2df14266080a8e049defbe38 has already been disposed. >> >> > at >> >> > >> >> > >> >> > org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610) >> >> > at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source) >> >> > at >> >> > >> >> > >> >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> > at java.lang.reflect.Method.invoke(Method.java:498) >> >> > at >> >> > >> >> > >> >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) >> >> > at >> >> > >> >> > >> >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) >> >> > at >> >> > >> >> > >> >> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69) >> >> > at >> >> > >> >> > >> >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) >> >> > at >> >> > >> >> > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) >> >> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >> >> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >> >> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:495) >> >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:224) >> >> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >> >> > at >> >> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> > at >> >> > >> >> > >> >> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> >> > at >> >> > >> >> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> >> > at >> >> > >> >> > >> >> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> > >> >> > >> >> > Thanks >> >> > Amit >> > >> > > >
Re: Batch job stuck in Canceled state in Flink 1.5
Thanks! Fabian I will try using the current release-1.5 branch and update this thread. -- Thanks, Amit On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske wrote: > Hi Amit, > > We recently fixed a bug in the network stack that affected batch jobs > (FLINK-9144). > The fix was added after your commit. > > Do you have a chance to build the current release-1.5 branch and check if > the fix also resolves your problem? > > Otherwise it would be great if you could open a blocker issue for the 1.5 > release to ensure that this is fixed. > > Thanks, > Fabian > > 2018-04-29 18:30 GMT+02:00 Amit Jain : >> >> Cluster is running on commit 2af481a >> >> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain wrote: >> > Hi, >> > >> > We are running numbers of batch jobs in Flink 1.5 cluster and few of >> > those >> > are getting stuck at random. These jobs having the following failure >> > after >> > which operator status changes to CANCELED and stuck to same. >> > >> > Please find complete TM's log at >> > https://gist.github.com/imamitjain/066d0e0ee24f2da1ddc83eba2012 >> > >> > >> > 2018-04-29 14:57:24,437 INFO org.apache.flink.runtime.taskmanager.Task >> > - Producer 98f5976716234236dc69fb0e82a0cc34 of partition >> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution. >> > org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException: >> > Execution 98f5976716234236dc69fb0e82a0cc34 producing partition >> > 5b8dd5ac2df14266080a8e049defbe38 has already been disposed. >> > at >> > >> > org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610) >> > at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source) >> > at >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> > at java.lang.reflect.Method.invoke(Method.java:498) >> > at >> > >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) >> > at >> > >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) >> > at >> > >> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69) >> > at >> > >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) >> > at >> > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) >> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >> > at akka.actor.ActorCell.invoke(ActorCell.scala:495) >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >> > at akka.dispatch.Mailbox.run(Mailbox.scala:224) >> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> > at >> > >> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> > at >> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> > at >> > >> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> > >> > >> > Thanks >> > Amit > >
Re: Batch job stuck in Canceled state in Flink 1.5
Cluster is running on commit 2af481a On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain wrote: > Hi, > > We are running numbers of batch jobs in Flink 1.5 cluster and few of those > are getting stuck at random. These jobs having the following failure after > which operator status changes to CANCELED and stuck to same. > > Please find complete TM's log at > https://gist.github.com/imamitjain/066d0e0ee24f2da1ddc83eba2012 > > > 2018-04-29 14:57:24,437 INFO org.apache.flink.runtime.taskmanager.Task > - Producer 98f5976716234236dc69fb0e82a0cc34 of partition > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution. > org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException: > Execution 98f5976716234236dc69fb0e82a0cc34 producing partition > 5b8dd5ac2df14266080a8e049defbe38 has already been disposed. > at > org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610) > at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > Thanks > Amit
Batch job stuck in Canceled state in Flink 1.5
Hi, We are running numbers of batch jobs in Flink 1.5 cluster and few of those are getting stuck at random. These jobs having the following failure after which operator status changes to CANCELED and stuck to same. Please find complete TM's log at https://gist.github.com/imamitjain/066d0e0ee24f2da1ddc83eba2012 2018-04-29 14:57:24,437 INFO org.apache.flink.runtime.taskmanager.Task - Producer 98f5976716234236dc69fb0e82a0cc34 of partition 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution. org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException: Execution 98f5976716234236dc69fb0e82a0cc34 producing partition 5b8dd5ac2df14266080a8e049defbe38 has already been disposed. at org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610) at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thanks Amit
Re: Testing on Flink 1.5
Hi Gary, This setting has resolved the issue. Does it increase timeout for all the RPC or specific components? We had following settings in Flink 1.3.2 and they did the job for us. akka.watch.heartbeat.pause: 600 s akka.client.timeout: 5 min akka.ask.timeout: 120 s -- Thanks, Amit
Re: Testing on Flink 1.5
Hi Gary, We found the underlying issue with the following problem. Few of our jobs are stuck with logs [1], these jobs are only able to allocate JM and couldn't get any TM, however, there are ample resource on our cluster. We are running ETL merge job here. In this job, we first find new deltas and if there is no delta detected then we make exit without actually executing the job. I think this is the reason we see no TM allocation is happening. I believe in above case (non-detached mode) we should mark the submitted application as complete compare to running. Please share your thoughts on this. Should I log this improvement in JIRA? Could you also recommend us the best practice in FLIP6, should we use YARN session or submit jobs in non-detached mode? -- Thanks, Amit
Re: KeyedSream question
Hi, KeyBy operation partition the data on given key and make sure same slot will get all future data belonging to same key. In default implementation, it can also map subset of keys in your DataStream to same slot. Assuming you have number of keys equal to number running slot then you may specify your custom keyBy operation to the achieve the same. Could you specify your case. -- Thanks Amit -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: TaskManager deadlock on NetworkBufferPool
+user@flink.apache.org On Wed, Apr 4, 2018 at 11:33 AM, Amit Jain wrote: > Hi, > > We are hitting TaskManager deadlock on NetworkBufferPool bug in Flink 1.3.2. > We have set of ETL's merge jobs for a number of tables and stuck with above > issue randomly daily. > > I'm attaching the thread dump of JobManager and one of the Task Manager (T1) > running stuck job. > We also observed, sometimes new job scheduled on T1 progresses even another > job is stuck there. > > "CHAIN DataSource (at createInput(ExecutionEnvironment.java:553) > (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (Map > at main(MergeTableSecond.java:175)) -> Map (Key Extractor) (6/9)" #1501 > daemon prio=5 os_prio=0 tid=0x7f9ea84d2fb0 nid=0x22fe in Object.wait() > [0x7f9ebf102000] >java.lang.Thread.State: TIMED_WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:224) > - locked <0x0005e28fe218> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:193) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132) > - locked <0x0005e29125f0> (a > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:168) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > > -- > Thanks, > Amit