Re: Reading Data from zip/gzip

2018-10-22 Thread Amit Jain
Hi Chris,

FileInputFormat automatically takes cares of file decompression for the
files with gzip, xz, bz2 and deflate extensions.

--
Thanks,
Amit

Source:
https://github.com/apache/flink/blob/7b040b915504e59243c642b1f4a84c956d96d134/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L118

private static void initDefaultInflaterInputStreamFactories() {
   InflaterInputStreamFactory[] defaultFactories = {
 DeflateInflaterInputStreamFactory.getInstance(),
 GzipInflaterInputStreamFactory.getInstance(),
 Bzip2InputStreamFactory.getInstance(),
 XZInputStreamFactory.getInstance(),
   };
   for (InflaterInputStreamFactory inputStreamFactory : defaultFactories) {
  for (String fileExtension :
inputStreamFactory.getCommonFileExtensions()) {
 registerInflaterInputStreamFactory(fileExtension, inputStreamFactory);
  }
   }
}


On Mon, Oct 22, 2018 at 2:03 PM chrisr123  wrote:

> I'm able to read normal txt or csv files using Flink,
> but what would I need to do in order to read them if they
> are given to me in zip or gzip format? Assuming I do not want
> to have to unzip them.
> Thanks!
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Why am I getting AWS access denied error for request type [DeleteObjectRequest] in S3?

2018-10-15 Thread Amit Jain
Hi Harshith,

Did you enable delete permission on S3 for running machines? Are you using
IAM roles or access key id and secret access key combo?

--
Thanks,
Amit

On Mon, Oct 15, 2018 at 3:15 PM Kumar Bolar, Harshith 
wrote:

> Hi all,
>
>
>
> We store Flink checkpoints in Amazon S3. Flink periodically sends out GET,
> PUT, LIST, DELETE requests to S3, to store-clear checkpoints. From the
> logs, we see that GET, PUT and LIST requests are successful but it throws
> an AWS access denied error for DELETE request.
>
>
>
> Here’s a snippet of the logs for DELETE request –
>
>
>
> 2018-10-15 04:13:22,819 INFO
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency -
> ServiceName=[Amazon S3], AWSErrorCode=[AccessDenied], StatusCode=[403],
> ServiceEndpoint=[https://xxx-xxx-prod.s3.amazonaws.com],
> Exception=[org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
> Access Denied (Service: Amazon S3; *Status Code: 403; Error Code:
> AccessDenied;* Request ID: x), S3 Extended Request ID:
> xxx],
> *RequestType=[DeleteObjectRequest]*, AWSRequestID=[XX],
> HttpClientPoolPendingCount=0, RetryCapacityConsumed=0,
> HttpClientPoolAvailableCount=1, RequestCount=1, Exception=1,
> HttpClientPoolLeasedCount=0, ClientExecuteTime=[4.984],
> HttpClientSendRequestTime=[0.029], HttpRequestTime=[4.84],
> RequestSigningTime=[0.038], CredentialsRequestTime=[0.0, 0.0],
> HttpClientReceiveResponseTime=[4.78]
>
>
>
> Is there some configuration that we’re forgetting that is preventing Flink
> from sending DELETE requests to S3?
>
>
>
> I’d be happy to provide more information if needed.
>
>
>
> Thanks,
>
> Harshith
>
>
>
>
>


Re: Flink1.6 Confirm that flink supports the plan function?

2018-10-15 Thread Amit Jain
Hi,

2) You may also want to look into ParameterTool[1] class to parse and read
passed properties file [2].

--
Thanks,
Amit

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/java/utils/ParameterTool.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/best_practices.html#getting-your-configuration-values-into-the-parametertool

On Mon, Oct 15, 2018 at 1:28 PM Till Rohrmann  wrote:

> Hi,
>
> 1) you currently cannot merge multiple jobs into one after they have been
> submitted. What you can do though, is to combine multiple jobs in your
> Flink program before you submit it.
>
> 2) you can pass program arguments when you submit your job. After it
> has been submitted, it is no longer possible to change the command line
> arguments.
>
> Cheers,
> Till
>
> On Mon, Oct 15, 2018 at 9:11 AM wangziyu <2375900...@qq.com> wrote:
>
>> Dear Friend:
>>   Now ,I am a learn flink for 20 days.I would to trouble
>> friends
>> to help solve two problems.
>> Questions are as follows:
>>   1. If I have some jobs,How can I merge the some jobs to One
>> that convenient for me to manage?
>> I have look for some restful api in
>> "
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
>> "。I
>> see "/jars/:jarid/plan" it seem say "Returns the dataflow plan of a job
>> contained in a jar previously uploaded via '/jars/upload'."I think it is
>> not
>> my purpose.
>>   2.When I run a job,I need pass in several  parameters.For
>> example "./flink run -d -c streaming.Kafka010NumCountConsumer
>> /ziyu/flink/kafkaFlink-1.0-SNAPSHOT.jar h1 /ziyu/h1.txt" .Now If I have
>> know
>> JobId,Can I get the job pass in several  parameters by java.I think it is
>> has some interface can use,But I can't get it.
>>  That is all.Can you help me that give me some
>> information.Thanks so mach.
>>
>>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Flink streaming-job with redis sink: SocketTimeoutException

2018-10-15 Thread Amit Jain
Hi Marke,

Stacktrace suggests it is more of a Redis connection issue rather than
something with Flink. Could you share JedisPool configuration of Redis
sink? Are you writing into Redis in continuity or some bulk logic? Looks
like Redis connections are getting timeout here.

--
Thanks,
Amit

On Mon, Oct 15, 2018 at 12:19 PM Marke Builder 
wrote:

> Hi,
>
> what can be the reasons for the following exceptions.
> We are using flink with a redis sink, but from time to time the flink job
> failed with the follwing excpetions.
>
> Thanks, Builder.
>
>
> 10/13/2018 15:37:48 Flat Map -> (Sink: Unnamed, Sink: Unnamed)(9/10)
> switched to FAILED
> redis.clients.jedis.exceptions.JedisConnectionException:
> java.net.SocketTimeoutException: Read timed out
> at
> redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:201)
> at
> redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)
> at redis.clients.jedis.Protocol.process(Protocol.java:141)
> at redis.clients.jedis.Protocol.read(Protocol.java:205)
> at
> redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:297)
> at
> redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:196)
> at redis.clients.jedis.Jedis.set(Jedis.java:69)
> at
> org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.set(RedisContainer.java:178)
> at
> org.apache.flink.streaming.connectors.redis.RedisSink.invoke(RedisSink.java:143)
> at
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.voith.cloud.cache.operators.FlatMapMeasurements.flatMap(FlatMapMeasurements.java:27)
> at
> com.voith.cloud.cache.operators.FlatMapMeasurements.flatMap(FlatMapMeasurements.java:12)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.net.SocketTimeoutException: Read timed out
> at java.net.SocketInputStream.socketRead0(Native Method)
> at
> java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
> at java.net.SocketInputStream.read(SocketInputStream.java:171)
> at java.net.SocketInputStream.read(SocketInputStream.java:141)
> at java.net.SocketInputStream.read(SocketInputStream.java:127)
> at
> redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:195)
> ... 26 more
>
>


Re: DatabaseClient at async example

2018-10-03 Thread Amit Jain
Hi Nicos,

DatabaseClient is an example class to describe the asyncio concept. There
is no interface/class for this client in Flink codebase. You can use any
mariaDB client implementation which supports concurrent request to DB.

--
Cheers,
Amit

On Wed, Oct 3, 2018 at 8:14 PM Nicos Maris  wrote:

> Hello,
>
> I have a short question about the following example in your documentation.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/asyncio.html
>
>
> Which is the package and the maven dependency of the class DatabaseClient?
>
> I am building a Proof of Concept based on the above documentation and
> mariaDB.
>


Re: BucketingSink to S3: Missing class com/amazonaws/AmazonClientException

2018-10-03 Thread Amit Jain
Hi Julio,

What's the Flink version for this setup?

--
Thanks,
Amit

On Wed, Oct 3, 2018 at 4:22 PM Andrey Zagrebin 
wrote:

> Hi Julio,
>
> Looks like some problem with dependencies.
> Have you followed the recommended s3 configuration guide [1]?
> Is it correct that your job already created checkpoints/savepoints on s3
> before?
>
> I think if you manually create file system using FileSystem.get(path), it
> should be configured the same way as for bucketing sink and checkpoints.
>
> Best,
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/aws.html#s3-simple-storage-service
>
> On 2 Oct 2018, at 15:21, Julio Biason  wrote:
>
> Hey guys,
>
> I've setup a BucketingSink as a dead letter queue into our Ceph cluster using 
> S3, but when I start the job, I get this error:
>
>
> java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:348)
>   at 
> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306)
>   at 
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271)
>   at 
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367)
>   at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: 
> com.amazonaws.AmazonClientException
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 17 more
>
> I find it weird 'cause I've already set up checkpoints (and savepoitns) to
> use S3 as protocol, and I just assume that, if it works for checkpoints, it
> should work here.
>
> (I suppose I could add the aws client as a dependency of my build but,
> again, I assumed that once S3 works for checkpoints, it should work
> everywhere.)
>
> And kinda related, can I assume that using the FileSystem class to create
> FSOutputStreams will follow Flink configuration? I have another type of
> dead letter queue that won't work with BucketingSink and I was thinking
> about using it directly to create files inside that Ceph/S3.
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION*  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51
> *99907 0554*
>
>
>


Re: Exception while submitting jobs through Yarn

2018-06-18 Thread Amit Jain
Hi Gravit,

I think Till is interested to know about classpath details present at the
start of JM and TM logs e.g. following logs provide classpath details used
by TM in our case.

2018-06-17 19:01:30,656 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -

2018-06-17 19:01:30,658 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  Starting YARN TaskExecutor runner (Version: 1.5.0,
Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
2018-06-17 19:01:30,659 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  OS current user: yarn
2018-06-17 19:01:31,662 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  Current Hadoop/Kerberos user: hadoop
2018-06-17 19:01:31,663 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation -
1.8/25.171-b10
2018-06-17 19:01:31,663 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  Maximum heap size: 6647 MiBytes
2018-06-17 19:01:31,663 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  JAVA_HOME: /usr/lib/jvm/java-openjdk
2018-06-17 19:01:31,664 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  Hadoop version: 2.8.3
2018-06-17 19:01:31,664 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  JVM Options:
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - -Xms6936m
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - -Xmx6936m
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - -XX:MaxDirectMemorySize=4072m
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -
-Dlog.file=/var/log/hadoop-yarn/containers/application_1528342246614_0002/container_1528342246614_0002_01_282649/taskmanager.log
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - -Dlogback.configurationFile=file:./logback.xml
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - -Dlog4j.configuration=file:./log4j.properties
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  Program Arguments:
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - --configDir
2018-06-17 19:01:31,665 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - .
*2018-06-17 19:01:31,666 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 -  Classpath:
lib/flink-dist_2.11-1.5.0.jar:lib/flink-python_2.11-1.5.0.jar:lib/flink-shaded-hadoop2-uber-1.5.0.jar:lib/flink-shaded-include-yarn-0.9.1.jar:lib/guava-18.0.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.7.jar:log4j.properties:logback.xml:flink.jar:flink-conf.yaml::/etc/hadoop/conf:/usr/lib/hadoop/hadoop-common-2.8.3-amzn-0.jar:/usr/lib/hadoop/hadoop-archive-logs.jar:/usr/lib/hadoop/hadoop-auth.jar:/usr/lib/hadoop/hadoop-archives-2.8.3-amzn-0.jar:/usr/lib/hadoop/hadoop-archive-logs-2.8.3-amzn-0.jar:/usr/lib/hadoop/hadoop-azure-datalake-2.8.3-amzn-0.jar.*

--
Thanks,
Amit

On Mon, Jun 18, 2018 at 2:00 PM, Garvit Sharma  wrote:

> Hi,
>
> Please refer to my previous mail for complete logs.
>
> Thanks,
>
> On Mon, Jun 18, 2018 at 1:17 PM Till Rohrmann 
> wrote:
>
>> Could you also please share the complete log file with us.
>>
>> Cheers,
>> Till
>>
>> On Sat, Jun 16, 2018 at 5:22 PM Ted Yu  wrote:
>>
>>> The error for core-default.xml is interesting.
>>>
>>> Flink doesn't have this file. Probably it came with Yarn. Please check
>>> the hadoop version Flink was built with versus the hadoop version in your
>>> cluster.
>>>
>>> Thanks
>>>
>>>  Original message 
>>> From: Garvit Sharma 
>>> Date: 6/16/18 7:23 AM (GMT-08:00)
>>> To: trohrm...@apache.org
>>> Cc: Chesnay Schepler , user@flink.apache.org
>>> Subject: Re: Exception while submitting jobs through Yarn
>>>
>>> I am not able to figure out, got stuck badly in this since last 1 week.
>>> Any little help would be appreciated.
>>>
>>>
>>> 2018-06-16 19:25:10,523 DEBUG org.apache.flink.streaming.api.graph.
>>> StreamingJobGraphGenerator  - Parallelism set: 1 for 8
>>>
>>> 2018-06-16 19:25:10,578 DEBUG org.apache.flink.streaming.api.graph.
>>> StreamingJobGraphGenerator  - Parallelism set: 1 for 1
>>>
>>> 2018-06-16 19:25:10,588 DEBUG org.apache.flink.streaming.api.graph.
>>> StreamingJobGraphGenerator  - CONNECTED: KeyGroupStreamPartitioner - 1
>>> -> 8
>>>
>>> 2018-06-16 19:25:10,591 DEBUG org.apache.flink.streaming.api.graph.
>>> StreamingJobGraphGenerator  - Parallelism set: 1 for 5
>>>
>>> 2018-06-16 19:25:10,597 DEBUG org.apache.flink.streaming.api.graph.
>>> StreamingJobGraphGenerator  - CONNECTED: KeyGroupStreamPartitioner - 5
>>> -> 8
>>>
>>> 

Re: Implementing a “join” between a DataStream and a “set of rules”

2018-06-05 Thread Amit Jain
Hi Sandybayev,

In the current state, Flink does not provide a solution to the
mentioned use case. However, there is open FLIP[1] [2] which has been
created to address the same.

I can see in your current approach, you are not able to update the
rule set data. I think you can update rule set data by building
DataStream around changelogs which are stored in message
queue/distributed file system.
OR
You can store rule set data in the external system where you can query
for incoming keys from Flink.

[1]: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
[2]: https://issues.apache.org/jira/browse/FLINK-6131

On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta)
 wrote:
> Hi,
>
>
>
> What is the best practice recommendation for the following use case? We need
> to match a stream against a set of “rules”, which are essentially a Flink
> DataSet concept. Updates to this “rules set" are possible but not frequent.
> Each stream event must be checked against all the records in “rules set”,
> and each match produces one or more events into a sink. Number of records in
> a rule set are in the 6 digit range.
>
>
>
> Currently we're simply loading rules into a local List of rules and using
> flatMap over an incoming DataStream. Inside flatMap, we're just iterating
> over a list comparing each event to each rule.
>
>
>
> To speed up the iteration, we can also split the list into several batches,
> essentially creating a list of lists, and creating a separate thread to
> iterate over each sub-list (using Futures in either Java or Scala).
>
>
>
> Questions:
>
> 1.Is there a better way to do this kind of a join?
>
> 2.If not, is it safe to add additional parallelism by creating
> new threads inside each flatMap operation, on top of what Flink is already
> doing?
>
>
>
> Thanks in advance!
>
> Turar
>
>


Re: Flink 1.5, failed to instantiate S3 FS

2018-06-02 Thread Amit Jain
Hi Hao,

Have look over 
https://issues.apache.org/jira/browse/HADOOP-13811?focusedCommentId=15703276=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15703276

What version of Hadoop are you using? Could you provide classpath used
by Flink Job Manager, it is present in jobmanager.log file.

--
Cheers,
Amit


Re: Batch job stuck in Canceled state in Flink 1.5

2018-05-29 Thread Amit Jain
Thanks Till. `taskmanager.network.request-backoff.max` option helped in my
case.  We tried this on 1.5.0 and jobs are running fine.


--
Thanks
Amit

On Thu 24 May, 2018, 4:58 PM Amit Jain,  wrote:

> Thanks! Till. I'll give a try on your suggestions and update the thread.
>
> On Wed, May 23, 2018 at 4:43 AM, Till Rohrmann 
> wrote:
> > Hi Amit,
> >
> > it looks as if the current cancellation cause is not the same as the
> > initially reported cancellation cause. In the current case, it looks as
> if
> > the deployment of your tasks takes so long that that maximum
> > `taskmanager.network.request-backoff.max` value has been reached. When
> this
> > happens a task gives up asking for the input result partition and fails
> with
> > the `PartitionNotFoundException`.
> >
> > More concretely, the `CHAIN Reduce (GroupReduce at
> > first(SortedGrouping.java:210)) -> Map (Key Extractor) (2/14)` cannot
> > retrieve the result partition of the `CHAIN DataSource (at
> > createInput(ExecutionEnvironment.java:548)
> > (org.apache.flink.api.java.io.TextInputFormat)) -> Map (Data Source
> > org.apache.flink.api.java.io.TextInputFormat
> >
> [s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/redshift_logs/2018/5/20/14/,
> >
> s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/redshift_logs/2018/5/20/15/0/])
> > -> Map (Key Extractor) -> Combine (GroupReduce at
> > first(SortedGrouping.java:210)) (8/14)` task. This tasks is in the state
> > deploying when the exception occurs. It seems to me that this task takes
> > quite some time to be deployed.
> >
> > One reason why the deployment could take some time is that an UDF (for
> > example the closure) of one of the operators is quite large. If this is
> the
> > case, then Flink offloads the corresponding data onto the BlobServer from
> > where they are retrieved by the TaskManagers. Since you are running in
> > non-HA mode, the BlobServer won't store the blobs on HDFS from where they
> > could be retrieved. Instead all the TaskManagers ask the single
> BlobServer
> > for the required TDD blobs. Depending on the size of the TDDs, the
> > BlobServer might become the bottleneck.
> >
> > What you can try to do is the following
> > 1) Try to reduce the closure object of the UDFs in the above-mentioned
> task.
> > 2) Increase `taskmanager.network.request-backoff.max` to give the system
> > more time to download the blobs
> > 3) Run the cluster in HA mode which will store the blobs also under
> > `high-availability.storageDir` (usually HDFS or S3). Before downloading
> the
> > blobs from the BlobServer, Flink will first try to download them from the
> > `high-availability-storageDir`
> >
> > Let me know if this solves your problem.
> >
> > Cheers,
> > Till
> >
> > On Tue, May 22, 2018 at 1:29 PM, Amit Jain  wrote:
> >>
> >> Hi Nico,
> >>
> >> Please find the attachment for more logs.
> >>
> >> --
> >> Thanks,
> >> Amit
> >>
> >> On Tue, May 22, 2018 at 4:09 PM, Nico Kruber 
> >> wrote:
> >> > Hi Amit,
> >> > thanks for providing the logs, I'll look into it. We currently have a
> >> > suspicion of this being caused by
> >> > https://issues.apache.org/jira/browse/FLINK-9406 which we found by
> >> > looking over the surrounding code. The RC4 has been cancelled since we
> >> > see this as a release blocker.
> >> >
> >> > To rule out further errors, can you also provide logs for the task
> >> > manager producing partitions d6946b39439f10e8189322becf1b8887,
> >> > 9aa35dd53561f9220b7b1bad84309d5f and 60909dec9134eb980e18064358dc2c81?
> >> > The task manager log you provided covers the task manager asking for
> >> > this partition only for which the job manager produces the
> >> > PartitionProducerDisposedException that you see.
> >> > I'm looking for the logs of task managers with the following execution
> >> > IDs in their logs:
> >> > - 2826f9d430e05e9defaa46f60292fa79
> >> > - 7ef992a067881a07409819e3aea32004
> >> > - ec923ce6d891d89cf6fecb5e3f5b7cc5
> >> >
> >> > Regarding the operators being stuck: I'll have a further look into the
> >> > logs and state transition and will come back to you.
> >> >
> >> >
> >> > Nico
> >> >
> >> >
> >> > On 21/05/18 09:27, Amit Jain wrote:
> >> >> Hi All,
> >> >>
> &g

Re: Batch job stuck in Canceled state in Flink 1.5

2018-05-24 Thread Amit Jain
Thanks! Till. I'll give a try on your suggestions and update the thread.

On Wed, May 23, 2018 at 4:43 AM, Till Rohrmann <trohrm...@apache.org> wrote:
> Hi Amit,
>
> it looks as if the current cancellation cause is not the same as the
> initially reported cancellation cause. In the current case, it looks as if
> the deployment of your tasks takes so long that that maximum
> `taskmanager.network.request-backoff.max` value has been reached. When this
> happens a task gives up asking for the input result partition and fails with
> the `PartitionNotFoundException`.
>
> More concretely, the `CHAIN Reduce (GroupReduce at
> first(SortedGrouping.java:210)) -> Map (Key Extractor) (2/14)` cannot
> retrieve the result partition of the `CHAIN DataSource (at
> createInput(ExecutionEnvironment.java:548)
> (org.apache.flink.api.java.io.TextInputFormat)) -> Map (Data Source
> org.apache.flink.api.java.io.TextInputFormat
> [s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/redshift_logs/2018/5/20/14/,
> s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/redshift_logs/2018/5/20/15/0/])
> -> Map (Key Extractor) -> Combine (GroupReduce at
> first(SortedGrouping.java:210)) (8/14)` task. This tasks is in the state
> deploying when the exception occurs. It seems to me that this task takes
> quite some time to be deployed.
>
> One reason why the deployment could take some time is that an UDF (for
> example the closure) of one of the operators is quite large. If this is the
> case, then Flink offloads the corresponding data onto the BlobServer from
> where they are retrieved by the TaskManagers. Since you are running in
> non-HA mode, the BlobServer won't store the blobs on HDFS from where they
> could be retrieved. Instead all the TaskManagers ask the single BlobServer
> for the required TDD blobs. Depending on the size of the TDDs, the
> BlobServer might become the bottleneck.
>
> What you can try to do is the following
> 1) Try to reduce the closure object of the UDFs in the above-mentioned task.
> 2) Increase `taskmanager.network.request-backoff.max` to give the system
> more time to download the blobs
> 3) Run the cluster in HA mode which will store the blobs also under
> `high-availability.storageDir` (usually HDFS or S3). Before downloading the
> blobs from the BlobServer, Flink will first try to download them from the
> `high-availability-storageDir`
>
> Let me know if this solves your problem.
>
> Cheers,
> Till
>
> On Tue, May 22, 2018 at 1:29 PM, Amit Jain <aj201...@gmail.com> wrote:
>>
>> Hi Nico,
>>
>> Please find the attachment for more logs.
>>
>> --
>> Thanks,
>> Amit
>>
>> On Tue, May 22, 2018 at 4:09 PM, Nico Kruber <n...@data-artisans.com>
>> wrote:
>> > Hi Amit,
>> > thanks for providing the logs, I'll look into it. We currently have a
>> > suspicion of this being caused by
>> > https://issues.apache.org/jira/browse/FLINK-9406 which we found by
>> > looking over the surrounding code. The RC4 has been cancelled since we
>> > see this as a release blocker.
>> >
>> > To rule out further errors, can you also provide logs for the task
>> > manager producing partitions d6946b39439f10e8189322becf1b8887,
>> > 9aa35dd53561f9220b7b1bad84309d5f and 60909dec9134eb980e18064358dc2c81?
>> > The task manager log you provided covers the task manager asking for
>> > this partition only for which the job manager produces the
>> > PartitionProducerDisposedException that you see.
>> > I'm looking for the logs of task managers with the following execution
>> > IDs in their logs:
>> > - 2826f9d430e05e9defaa46f60292fa79
>> > - 7ef992a067881a07409819e3aea32004
>> > - ec923ce6d891d89cf6fecb5e3f5b7cc5
>> >
>> > Regarding the operators being stuck: I'll have a further look into the
>> > logs and state transition and will come back to you.
>> >
>> >
>> > Nico
>> >
>> >
>> > On 21/05/18 09:27, Amit Jain wrote:
>> >> Hi All,
>> >>
>> >> I totally missed this thread. I've encountered same issue in Flink
>> >> 1.5.0 RC4. Please look over the attached logs of JM and impacted TM.
>> >>
>> >> Job ID 390a96eaae733f8e2f12fc6c49b26b8b
>> >>
>> >> --
>> >> Thanks,
>> >> Amit
>> >>
>> >> On Thu, May 3, 2018 at 8:31 PM, Nico Kruber <n...@data-artisans.com>
>> >> wrote:
>> >>> Also, please have a look at the other TaskManagers' logs, in
>> >>> particular
>> >>> the one th

Re: Strange Behaviour with task manager oom ?

2018-05-21 Thread Amit Jain
Hi,
Could you share log of job and impacted task manager? How much memory
you have allocated to the Job Manager?

--
Thanks,
Amit

On Mon, May 21, 2018 at 8:46 PM, sohimankotia  wrote:
> Hi,
>
> I am running flink batch job .
>
> My job is running fine if i use 4 task manger and 8 slots = 32 parallelism
> with 6GB memory per task manager.
>
> As soon I increase task mangers to 5 with 6 task per task manager = 30
> parallelism (6GB memory per task manager)
>
> I am getting oom error . I am not able to understand this strange behaviour
> .
>
> Container: container_e60_1526906661225_0213_01_01 on
> dh-cdh-m1d5.dbp.host1.in_8041
> 
> LogType:jobmanager.err
> Log Upload Time:Mon May 21 19:48:12 +0530 2018
> LogLength:3247
> Log Contents:
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/mnt/vol9/yarn/nm/usercache/hdfs/appcache/application_1526906661225_0213/filecache/13/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/mnt/vol7/yarn/nm/usercache/hdfs/appcache/application_1526906661225_0213/filecache/11/process-runners-0.0.2-SNAPSHOT-shaded.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/cloudera/parcels/CDH-5.5.1-1.cdh5.5.1.p0.11/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> Uncaught error from thread [flink-akka.remote.default-remote-dispatcher-5]
> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for
> ActorSystem[flink]
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
> at 
> akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
> at 
> akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> LogType:jobmanager.log
>
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Checkpointing when reading from files?

2018-05-21 Thread Amit Jain
Hi Alex,

StreamingExecutionEnvironment#readFile is a helper function to create
file reader data streaming source. It uses
ContinuousFileReaderOperator and ContinuousFileMonitoringFunction
internally.

As both file reader operator and monitoring function uses
checkpointing so is readFile [1], you can go with first approach.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readFile-org.apache.flink.api.common.io.FileInputFormat-java.lang.String-org.apache.flink.streaming.api.functions.source.FileProcessingMode-long-org.apache.flink.api.common.typeinfo.TypeInformation-


--
Thanks,
Amit


On Mon, May 21, 2018 at 9:39 PM, NEKRASSOV, ALEXEI  wrote:
> I want to add checkpointing to my program that reads from a set of files in
> a directory. Without checkpointing I use readFile():
>
>
>
>   DataStream text = env.readFile(
>
>new TextInputFormat(new Path(inputPath)),
>
>inputPath,
>
>   inputProcessingMode,
>
>   1000);
>
>
>
> Should I use ContinuousFileMonitoringFunction / ContinuousFileReaderOperator
> to add checkpointing? Or is there an easier way?
>
>
>
> How do I go from splits (that ContinuousFileMonitoringFunction provides) to
> actual strings? I’m not clear how ContinuousFileReaderOperator can be used.
>
>
>
>   DataStreamSource split =
> env.addSource(
>
>new ContinuousFileMonitoringFunction(
>
>  new TextInputFormat(new
> Path(inputPath)),
>
>  inputProcessingMode,
>
>  1,
>
>  1000)
>
>   );
>
>
>
> Thanks,
> Alex


Re: Message guarantees with S3 Sink

2018-05-21 Thread Amit Jain
Thanks Gary!

Sure, there are issues with updates in S3. You may want to look over
EMRFS guarantees of the consistent view [1]. I'm not sure, is it
possible in non-EMR AWS system or not.

I'm creating a JIRA issue regarding data loss possibility in S3. IMHO,
Flink docs should mention about possible data loss in S3.

[1] 
https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html

--
Thanks,
Amit

On Fri, May 18, 2018 at 2:48 AM, Gary Yao <g...@data-artisans.com> wrote:
> Hi Amit,
>
> The BucketingSink doesn't have well defined semantics when used with S3.
> Data
> loss is possible but I am not sure whether it is the only problem. There are
> plans to rewrite the BucketingSink in Flink 1.6 to enable eventually
> consistent
> file systems [1][2].
>
> Best,
> Gary
>
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/sink-with-BucketingSink-to-S3-files-override-td18433.html
> [2] https://issues.apache.org/jira/browse/FLINK-6306
>
> On Thu, May 17, 2018 at 11:57 AM, Amit Jain <aj201...@gmail.com> wrote:
>>
>> Hi,
>>
>> We are using Flink to process click stream data from Kafka and pushing
>> the same in 128MB file in S3.
>>
>> What is the message processing guarantees with S3 sink? In my
>> understanding, S3A client buffers the data on memory/disk. In failure
>> scenario on particular node, TM would not trigger Writer#close hence
>> buffered data can lose entirely assuming this buffer contains data of
>> last successful checkpointing.
>>
>> --
>> Thanks,
>> Amit
>
>


Re: Message guarantees with S3 Sink

2018-05-17 Thread Amit Jain
Hi Rong,

We are using BucketingSink only. I'm looking for the case where TM
does not get the chance to call Writer#flush like YARN killed the TM
because of OOM. We have configured fs.s3.impl to
com.amazon.ws.emr.hadoop.fs.EmrFileSystem in core-site.xml, so
BucketingSink is using S3 client internally.

When we write data using S3A client, it buffers up the data in memory
or disk until it hit multipart file size or call to close of
OutputStream happens. Now suppose, S3A client buffers up 40MB data in
TM's local disk and same time checkpoint barrier comes in at Sink and
got successfully completed. Write process in sink resumes and now
buffer data size reaches to 60MB and now YARN killed the TM. What
would happen to original 40MB of data ?

--
Thanks,
Amit




On Thu, May 17, 2018 at 10:28 PM, Rong Rong <walter...@gmail.com> wrote:
> Hi Amit,
>
> Can you elaborate how you write using "S3 sink" and which version of Flink
> you are using?
>
> If you are using BucketingSink[1], you can checkout the API doc and
> configure to flush before closing your sink.
> This way your sink is "integrated with the checkpointing mechanism to
> provide exactly once semantics"[2]
>
> Thanks,
> Rong
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>
> On Thu, May 17, 2018 at 2:57 AM, Amit Jain <aj201...@gmail.com> wrote:
>>
>> Hi,
>>
>> We are using Flink to process click stream data from Kafka and pushing
>> the same in 128MB file in S3.
>>
>> What is the message processing guarantees with S3 sink? In my
>> understanding, S3A client buffers the data on memory/disk. In failure
>> scenario on particular node, TM would not trigger Writer#close hence
>> buffered data can lose entirely assuming this buffer contains data of
>> last successful checkpointing.
>>
>> --
>> Thanks,
>> Amit
>
>


Message guarantees with S3 Sink

2018-05-17 Thread Amit Jain
Hi,

We are using Flink to process click stream data from Kafka and pushing
the same in 128MB file in S3.

What is the message processing guarantees with S3 sink? In my
understanding, S3A client buffers the data on memory/disk. In failure
scenario on particular node, TM would not trigger Writer#close hence
buffered data can lose entirely assuming this buffer contains data of
last successful checkpointing.

--
Thanks,
Amit


Re: [Flink 1.5.0] BlobServer data for a job is not getting cleaned up at JM

2018-05-16 Thread Amit Jain
Thanks Chesnay for the quick reply. I raised the ticket
https://issues.apache.org/jira/browse/FLINK-9381.

On Wed, May 16, 2018 at 5:33 PM, Chesnay Schepler <ches...@apache.org> wrote:
> Please open a JIRA.
>
>
> On 16.05.2018 13:58, Amit Jain wrote:
>>
>> Hi,
>>
>> We are running Flink 1.5.0 rc3 with YARN as cluster manager and found
>> Job Manager is getting killed due to out of disk error.
>>
>> Upon further analysis, we found blob server data for a job is not
>> getting cleaned up. Right now, we wrote directory cleanup script based
>> on directory creation time of job_.
>>
>> Please suggest good way to deal with this problem. Can I raise a JIRA
>> ticket for same?
>>
>> --
>> Thanks,
>> Amit
>>
>


[Flink 1.5.0] BlobServer data for a job is not getting cleaned up at JM

2018-05-16 Thread Amit Jain
Hi,

We are running Flink 1.5.0 rc3 with YARN as cluster manager and found
Job Manager is getting killed due to out of disk error.

Upon further analysis, we found blob server data for a job is not
getting cleaned up. Right now, we wrote directory cleanup script based
on directory creation time of job_.

Please suggest good way to deal with this problem. Can I raise a JIRA
ticket for same?

--
Thanks,
Amit


Re: Question about datasource replication

2018-05-04 Thread Amit Jain
Hi Flavio,

Which version of Flink are you using?

--
Thanks,
Amit

On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier  wrote:
> Hi all,
> I've a Flink batch job that reads a parquet dataset and then applies 2
> flatMap to it (see pseudocode below).
> The problem is that this dataset is quite big and Flink duplicates it before
> sending the data to these 2 operators (I've guessed this from the doubling
> amount of sent bytes) .
> Is there a way to avoid this behaviour?
>
> ---
> Here's the pseudo code of my job:
>
> DataSet X = readParquetDir();
> X1 = X.flatMap(...);
> X2 = X.flatMap(...);
>
> Best,
> Flavio


Re: Batch job stuck in Canceled state in Flink 1.5

2018-05-03 Thread Amit Jain
Hi Stephan,

Size of JM log file is 122 MB. Could you provide me other media to
post the same? We can use Google Drive if that's fine with you.

--
Thanks,
Amit

On Thu, May 3, 2018 at 12:58 PM, Stephan Ewen <se...@apache.org> wrote:
> Hi Amit!
>
> Thanks for sharing this, this looks like a regression with the network stack
> changes.
>
> The log you shared from the TaskManager gives some hint, but that exception
> alone should not be a problem. That exception can occur under a race between
> deployment of some tasks while the whole job is entering a recovery phase
> (maybe we should not print it so prominently to not confuse users). There
> must be something else happening on the JobManager. Can you share the JM
> logs as well?
>
> Thanks a lot,
> Stephan
>
>
> On Wed, May 2, 2018 at 12:21 PM, Amit Jain <aj201...@gmail.com> wrote:
>>
>> Thanks! Fabian
>>
>> I will try using the current release-1.5 branch and update this thread.
>>
>> --
>> Thanks,
>> Amit
>>
>> On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>> > Hi Amit,
>> >
>> > We recently fixed a bug in the network stack that affected batch jobs
>> > (FLINK-9144).
>> > The fix was added after your commit.
>> >
>> > Do you have a chance to build the current release-1.5 branch and check
>> > if
>> > the fix also resolves your problem?
>> >
>> > Otherwise it would be great if you could open a blocker issue for the
>> > 1.5
>> > release to ensure that this is fixed.
>> >
>> > Thanks,
>> > Fabian
>> >
>> > 2018-04-29 18:30 GMT+02:00 Amit Jain <aj201...@gmail.com>:
>> >>
>> >> Cluster is running on commit 2af481a
>> >>
>> >> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain <aj201...@gmail.com> wrote:
>> >> > Hi,
>> >> >
>> >> > We are running numbers of batch jobs in Flink 1.5 cluster and few of
>> >> > those
>> >> > are getting stuck at random. These jobs having the following failure
>> >> > after
>> >> > which operator status changes to CANCELED and stuck to same.
>> >> >
>> >> > Please find complete TM's log at
>> >> > https://gist.github.com/imamitjain/066d0e0ee24f2da1ddc83eba2012
>> >> >
>> >> >
>> >> > 2018-04-29 14:57:24,437 INFO
>> >> > org.apache.flink.runtime.taskmanager.Task
>> >> > - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
>> >> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution.
>> >> >
>> >> > org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException:
>> >> > Execution 98f5976716234236dc69fb0e82a0cc34 producing partition
>> >> > 5b8dd5ac2df14266080a8e049defbe38 has already been disposed.
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610)
>> >> > at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
>> >> > at
>> >> >
>> >> >
>> >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >> > at java.lang.reflect.Method.invoke(Method.java:498)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>> >> > at
>> >> >
>> >> > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>> >> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> >> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> >> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> >> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> >> > at
>> >> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >> > at
>> >> >
>> >> >
>> >> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >> > at
>> >> >
>> >> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >> > at
>> >> >
>> >> >
>> >> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >> >
>> >> >
>> >> > Thanks
>> >> > Amit
>> >
>> >
>
>


Re: Batch job stuck in Canceled state in Flink 1.5

2018-05-02 Thread Amit Jain
Thanks! Fabian

I will try using the current release-1.5 branch and update this thread.

--
Thanks,
Amit

On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske <fhue...@gmail.com> wrote:
> Hi Amit,
>
> We recently fixed a bug in the network stack that affected batch jobs
> (FLINK-9144).
> The fix was added after your commit.
>
> Do you have a chance to build the current release-1.5 branch and check if
> the fix also resolves your problem?
>
> Otherwise it would be great if you could open a blocker issue for the 1.5
> release to ensure that this is fixed.
>
> Thanks,
> Fabian
>
> 2018-04-29 18:30 GMT+02:00 Amit Jain <aj201...@gmail.com>:
>>
>> Cluster is running on commit 2af481a
>>
>> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain <aj201...@gmail.com> wrote:
>> > Hi,
>> >
>> > We are running numbers of batch jobs in Flink 1.5 cluster and few of
>> > those
>> > are getting stuck at random. These jobs having the following failure
>> > after
>> > which operator status changes to CANCELED and stuck to same.
>> >
>> > Please find complete TM's log at
>> > https://gist.github.com/imamitjain/066d0e0ee24f2da1ddc83eba2012
>> >
>> >
>> > 2018-04-29 14:57:24,437 INFO  org.apache.flink.runtime.taskmanager.Task
>> > - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
>> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution.
>> > org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException:
>> > Execution 98f5976716234236dc69fb0e82a0cc34 producing partition
>> > 5b8dd5ac2df14266080a8e049defbe38 has already been disposed.
>> > at
>> >
>> > org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610)
>> > at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
>> > at
>> >
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:498)
>> > at
>> >
>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
>> > at
>> >
>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>> > at
>> >
>> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69)
>> > at
>> >
>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>> > at
>> > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> > at
>> >
>> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> > at
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> > at
>> >
>> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >
>> >
>> > Thanks
>> > Amit
>
>


Re: Batch job stuck in Canceled state in Flink 1.5

2018-04-29 Thread Amit Jain
Cluster is running on commit 2af481a

On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain <aj201...@gmail.com> wrote:
> Hi,
>
> We are running numbers of batch jobs in Flink 1.5 cluster and few of those
> are getting stuck at random. These jobs having the following failure after
> which operator status changes to CANCELED and stuck to same.
>
> Please find complete TM's log at
> https://gist.github.com/imamitjain/066d0e0ee24f2da1ddc83eba2012
>
>
> 2018-04-29 14:57:24,437 INFO  org.apache.flink.runtime.taskmanager.Task
> - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
> 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution.
> org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException:
> Execution 98f5976716234236dc69fb0e82a0cc34 producing partition
> 5b8dd5ac2df14266080a8e049defbe38 has already been disposed.
> at
> org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610)
> at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Thanks
> Amit


Batch job stuck in Canceled state in Flink 1.5

2018-04-29 Thread Amit Jain
Hi,

We are running numbers of batch jobs in Flink 1.5 cluster and few of those
are getting stuck at random. These jobs having the following failure after
which operator status changes to CANCELED and stuck to same.

Please find complete TM's log at
https://gist.github.com/imamitjain/066d0e0ee24f2da1ddc83eba2012


2018-04-29 14:57:24,437 INFO  org.apache.flink.runtime.taskmanager.Task
 - Producer 98f5976716234236dc69fb0e82a0cc34 of partition
5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling execution.
org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException:
Execution 98f5976716234236dc69fb0e82a0cc34 producing partition
5b8dd5ac2df14266080a8e049defbe38 has already been disposed.
at
org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610)
at sun.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Thanks
Amit


Re: Testing on Flink 1.5

2018-04-20 Thread Amit Jain
Hi Gary,

This setting has resolved the issue. Does it increase timeout for all the
RPC or specific components?

We had following settings in Flink 1.3.2 and they did the job for us.

akka.watch.heartbeat.pause: 600 s
akka.client.timeout: 5 min
akka.ask.timeout: 120 s


--
Thanks,
Amit


Re: Testing on Flink 1.5

2018-04-19 Thread Amit Jain
Hi Gary,

We found the underlying issue with the following problem.
Few of our jobs are stuck with logs [1], these jobs are only able to
allocate JM and couldn't get any TM, however, there are ample resource on
our cluster.

We are running ETL merge job here. In this job, we first find new deltas
and if there is no delta detected then we make exit without actually
executing the job. I think this is the reason we see no TM allocation is
happening.

I believe in above case (non-detached mode) we should mark the submitted
application as complete compare to running. Please share your thoughts on
this.
Should I log this improvement in JIRA?

Could you also recommend us the best practice in FLIP6, should we use YARN
session or submit jobs in non-detached mode?

--
Thanks,
Amit


Re: KeyedSream question

2018-04-04 Thread Amit Jain
Hi,

KeyBy operation partition the data on given key and make sure same slot will
get all future data belonging to same key. In default implementation, it can
also map subset of keys in your DataStream to same slot. 

Assuming you have number of keys equal to number running slot then you may
specify your custom keyBy operation to the achieve the same. 


Could you specify your case. 

--
Thanks 
Amit  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: TaskManager deadlock on NetworkBufferPool

2018-04-04 Thread Amit Jain
+user@flink.apache.org

On Wed, Apr 4, 2018 at 11:33 AM, Amit Jain <aj201...@gmail.com> wrote:
> Hi,
>
> We are hitting TaskManager deadlock on NetworkBufferPool bug in Flink 1.3.2.
> We have set of ETL's merge jobs for a number of tables and stuck with above
> issue randomly daily.
>
> I'm attaching the thread dump of JobManager and one of the Task Manager (T1)
> running stuck job.
> We also observed, sometimes new job scheduled on T1 progresses even another
> job is stuck there.
>
> "CHAIN DataSource (at createInput(ExecutionEnvironment.java:553)
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (Map
> at main(MergeTableSecond.java:175)) -> Map (Key Extractor) (6/9)" #1501
> daemon prio=5 os_prio=0 tid=0x7f9ea84d2fb0 nid=0x22fe in Object.wait()
> [0x7f9ebf102000]
>java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:224)
> - locked <0x0005e28fe218> (a java.util.ArrayDeque)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:193)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132)
> - locked <0x0005e29125f0> (a
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
> at
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> at
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> at
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:168)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
>
> --
> Thanks,
> Amit