Re: Flink - excessive amount of yarn container requests for versions > 1.4.0

2018-12-09 Thread Shuyi Chen
We've seen similar issue in our production, you can refer to this JIRA (
https://issues.apache.org/jira/browse/FLINK-10848) for more detail.

Shuyi

On Sun, Dec 9, 2018 at 11:27 PM sohimankotia  wrote:

> Hi ,
>
> While running Flink streaming job it is requesting more than specified
> resources from yarn. I am giving 17 TM but it is requesting more than > 35
> containers from yarn .
>
> This is happening for all versions greater than 1.4.0.
>
>
> Attaching JM logs.
>
> logs.zip
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/logs.zip>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
"So you have to trust that the dots will somehow connect in your future."


Flink job execution graph hints

2018-12-09 Thread Taher Koitawala
Hi All,
  Is there a way to send hints to the job graph builder!? Like
specifically disabling or enabling chaining.


Flink - excessive amount of yarn container requests for versions > 1.4.0

2018-12-09 Thread sohimankotia
Hi ,

While running Flink streaming job it is requesting more than specified
resources from yarn. I am giving 17 TM but it is requesting more than > 35
containers from yarn .

This is happening for all versions greater than 1.4.0.


Attaching JM logs.

logs.zip

  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink 1.7.0 and 1.5.5 - Returning excess container issue

2018-12-09 Thread sohimankotia
Hi,

Flink is requesting more than specified containers from yarn . I am using 17
TM and 3 Slots but in starting it is acquiring > 35 TM and then releasing
them after sometime .

I have attached JM debug logs . Not sure what could be the issue ?


logs.zip

  




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink issue while setting up in Kubernetes

2018-12-09 Thread Thakur, Abhi
We are trying to setup a single node Kubernetes cluster.
1 Job Manager and 1 Task Manager.
Before we were getting an error, and we followed this thread.
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-5-4-issues-w-TaskManager-connecting-to-ResourceManager-td23298.html
After following the above mentioned archive , we have used the following 
commands to startup the Flink services :

${FLINK_HOME}/bin/jobmanager.sh start-foreground
${FLINK_HOME}/bin/taskmanager.sh start-foreground

Previously jobmanager was being started as :
${FLINK_HOME}/bin/jobmanager.sh start-foreground  cluster
${FLINK_HOME}/bin/taskmanager.sh start-foreground

It removed that error and now we are getting this error as shown below.
We searched all  archives and have a dead end.
We have set up all ports correctly. Flink version used is 1.6.2.
Thanks in advance.


2018-12-08 06:52:38,959 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system 
[akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed, address 
is now gated for [50] ms. Reason: [Disassociated]
  2018-12-08 06:52:41,863 INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - 
Registering TaskManager with ResourceID 037d1c33ec0406598f2ce30472f97e65 
(akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122/user/taskmanager_0) 
at ResourceManager
  2018-12-08 06:53:23,619 INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - The 
heartbeat of TaskManager with id e0383ee248832f639659082c70a2f4e9 timed out.
  2018-12-08 06:53:23,619 INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Closing 
TaskExecutor connection e0383ee248832f639659082c70a2f4e9 because: The heartbeat 
of TaskManager with id e0383ee248832f639659082c70a2f4e9 timed out.
  2018-12-08 06:53:48,961 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system 
[akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed, address 
is now gated for [50] ms. Reason: [Disassociated]
  2018-12-08 06:53:53,615 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system 
[akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed, address 
is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122]] Caused by: 
[flink-taskmanager-6cf55db87b-5x9sd: Name or service not known]
  2018-12-08 06:54:03,601 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system 
[akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed, address 
is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122]] Caused by: 
[flink-taskmanager-6cf55db87b-5x9sd]
  2018-12-08 06:54:13,605 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system 
[akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed, address 
is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122]] Caused by: 
[flink-taskmanager-6cf55db87b-5x9sd: Name or service not known]
  2018-12-08 06:54:23,613 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system 
[akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed, address 
is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122]] Caused by: 
[flink-taskmanager-6cf55db87b-5x9sd: Name or service not known]
  2018-12-08 06:54:33,601 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system 
[akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed, address 
is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122]] Caused by: 
[flink-taskmanager-6cf55db87b-5x9sd]
  2018-12-08 06:54:33,619 INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - The 
heartbeat of TaskManager with id 037d1c33ec0406598f2ce30472f97e65 timed out.
  2018-12-08 06:54:33,619 INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Closing 
TaskExecutor connection 037d1c33ec0406598f2ce30472f97e65 because: The heartbeat 
of TaskManager with id 037d1c33ec0406598f2ce30472f97e65 timed out.
  2018-12-08 06:59:04,160 INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - 
Registering TaskManager with ResourceID 0c12c8035663c0b7bc94d520b3c6ff0a 
(akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122/user/taskmanager_0) 
at ResourceManager
  2018-12-08 07:00:08,983 WARN akka.remote.ReliableDeliverySupervisor - 
Association with remote system 
[akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed, address 
is now gated for [50] ms. Reason: [Disassociated]
  2018-12-08 07:00:12,040 INFO 

Re: S3A AWSS3IOException from Flink's BucketingSink to S3

2018-12-09 Thread Ken Krugler
Hi Generic Flink Developer,

Normally when you get an internal error from AWS, you also get a 500 status 
code - the 200 seems odd to me.

One thing I do know is that if you’re hitting S3 hard, you have to expect and 
recover from errors.

E.g. distcpy jobs in Hadoop-land will auto-retry a failed request, because 
Things Go Wrong in AWS-land.

So it surprises me a bit that BucketingSink is using a raw S3AFileSystem. In 
absence of Hadoop 3.1 support for S3A retry policies 
, it seems like Flink would 
want to wrap the S3AFileSystem with something that would retry requests which 
get an internal error.

But I haven’t walked that code, so maybe there is retry support somewhere else…

— Ken



> On Dec 9, 2018, at 1:37 PM, Flink Developer  
> wrote:
> 
> Hi, is there any idea on what causes this and how it can be resolved? Thanks.
> 
> ‐‐‐ Original Message ‐‐‐
> On Wednesday, December 5, 2018 12:44 AM, Flink Developer 
>  wrote:
> 
>> I have a Flink app with high parallelism (400) running in AWS EMR. It uses 
>> Flink v1.5.2. It sources Kafka and sinks to S3 using BucketingSink (using 
>> RocksDb backend for checkpointing). The destination is defined using 
>> "s3a://" prefix. The Flink job is a streaming app which runs continuously. 
>> At any given time, it's possible that each worker will write to a part file 
>> in S3. This means all workers combined could potentially generate/write to 
>> 400 files (due to 400 parallelism). 
>> 
>> After a few days, one of the workers will fail with the exception:
>> 
>> org.apache.hadoop.fs.s3a.AWSS3IOException: 
>> copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, 
>> bucket/2018-09-01/05/_file-10-1.gz.pending): 
>> com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an 
>> internal error. Pelase try again. (Service: Amazon S3; Status Code: 200 
>> InternalError; Request ID: xx; S3 Extended Request ID: 
>> yyy
>> at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 
>> 178)
>> at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 
>> 1803)
>> at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)
>> at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)
>> at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)
>> at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
>> at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
>> 
>> This seems to randomly occur when a new part file is created by the 
>> BucketingSink. The odd thing is that this happens randomly but consistently 
>> on separate job executions. When it occurs, it happens to 1 of the parallel 
>> flink workers (not all). Also, when this occurs, the Flink job transitions 
>> into a FAILING state, but the Flink job does not restart and resume/recover 
>> from the last successful checkpoint.  
>> 
>> What is the cause for this and how can it be resolved? Also, how can the job 
>> be configured to restart/recover from the last successful checkpoint instead 
>> of staying in the FAILING state?
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: S3A AWSS3IOException from Flink's BucketingSink to S3

2018-12-09 Thread Flink Developer
Hi, is there any idea on what causes this and how it can be resolved? Thanks.

‐‐‐ Original Message ‐‐‐
On Wednesday, December 5, 2018 12:44 AM, Flink Developer 
 wrote:

> I have a Flink app with high parallelism (400) running in AWS EMR. It uses 
> Flink v1.5.2. It sources Kafka and sinks to S3 using BucketingSink (using 
> RocksDb backend for checkpointing). The destination is defined using "s3a://" 
> prefix. The Flink job is a streaming app which runs continuously. At any 
> given time, it's possible that each worker will write to a part file in S3. 
> This means all workers combined could potentially generate/write to 400 files 
> (due to 400 parallelism).
>
> After a few days, one of the workers will fail with the exception:
>
> org.apache.hadoop.fs.s3a.AWSS3IOException: 
> copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, 
> bucket/2018-09-01/05/_file-10-1.gz.pending): 
> com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal 
> error. Pelase try again. (Service: Amazon S3; Status Code: 200 InternalError; 
> Request ID: xx; S3 Extended Request ID: yyy
> at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 
> 178)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 
> 1803)
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
>
> This seems to randomly occur when a new part file is created by the 
> BucketingSink. The odd thing is that this happens randomly but consistently 
> on separate job executions. When it occurs, it happens to 1 of the parallel 
> flink workers (not all). Also, when this occurs, the Flink job transitions 
> into a FAILING state, but the Flink job does not restart and resume/recover 
> from the last successful checkpoint.
>
> What is the cause for this and how can it be resolved? Also, how can the job 
> be configured to restart/recover from the last successful checkpoint instead 
> of staying in the FAILING state?

Checkpoints replication over data centers synchronized with ZooKeeper in HA mode

2018-12-09 Thread Shimony, Shay
Hello everyone!

In our planned setup we have 2 data centers, each in different geographic zone 
(and third for ZK as tie breaker). We use HA with ZooKeeper, as follows:

Normally, DC1 will run our job:

DC1

DC2

DC3

Machine 1

Machine 2

Machine 3

Machine 4

Machine 5

ZK1

ZK2

ZK3

ZK4

ZK5

JM1 (leader)

JM2







TM1

TM2








But, after DC1 crashes, DC2 will take over, starting Flink processes and resume 
our job from checkpoint:

DC1

DC2

DC3

Machine 1

Machine 2

Machine 3

Machine 4

Machine 5





ZK3

ZK4

ZK5





JM3 (leader)

JM4







TM3

TM4





For checkpoints, we use filesystem over NAS.
And we have NAS in DC1, and replication of it in DC2. The replication is done 
in background, once a 1 minute.
If DC1 crashes, we recover our job in DC2, over its NAS replica (no zero data 
loss).
Our concern is that when job recovers in DC2, the checkpoint state in its NAS 
replica will be behind with respect to ZooKeeper's checkpoint reference. 
Meaning, ZooKeeper might point to checkpoint x, while in NAS of DC2 it still 
has only checkpoint x-1.

We hoped that using configuration "state.checkpoints.num-retained: 3" we will 
be able to solve it. That is - we had 3 latest checkpoints retained, and we 
tried to simulate such scenario by deleting files of checkpoint 14 (current 
latest, besides 13, 12), while leaving reference to checkpoint 14 in ZooKeeper. 
Our hope was that Flink will failover from that to checkpoint 13, but instead 
we see that it keeps trying to recover from 14, and failing with error "no such 
file or directory". Like that:

java.lang.IllegalStateException: Could not initialize keyed state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:292)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:224)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) 
~[flink-dist_2.11-1.4.2.jar:1.4.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) 
[flink-dist_2.11-1.4.2.jar:1.4.2]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
Caused by: java.io.FileNotFoundException: 
/logs/failsafe/checkpoints/3e1de245fb1cf1226aad7351a818be96/chk-14/57d26c1c-df40-4b6f-8046-500eb4c8a0b2
 (No such file or directory)
at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_161]
at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_161]
at java.io.FileInputStream.(FileInputStream.java:138) 
~[?:1.8.0_161]
at 
org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) 
~[flink-dist_2.11-1.4.2.jar:1.4.2]
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:70)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
at 
org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:112)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:471)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:446)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:282)
 ~[flink-dist_2.11-1.4.2.jar:1.4.2]
... 6 more

BTW, if we delete reference to checkpoint 14 in ZK, job recovers successfully 
from checkpoint 13. So we could try to automate that, by somehow monitoring & 
detecting that we fail to recover checkpoint x, so delete it in ZK so x-1 will 
be taken. Not great.

But maybe you have better ideas for how to deal with such setup with 
checkpoints replicated between 2 DCs while using ZK cluster for HA stretched 
over these 2 DCs?

Thanks!
Shay


Re: Failed to resume job from checkpoint

2018-12-09 Thread Ben Yan
hi,

1. I took a closer look at the relevant code about
RocksDBIncrementalRestoreOperation::restoreInstanceDirectoryFromPath. And I
did some verification. I found this problem is likely related to file
system connection restrictions. At first I was worried that my hdfs would
be overloaded due to a large number of connections, so I configured the
following related parameters:

fs..limit.total: (number, 0/-1 mean no
limit)fs..limit.input: (number, 0/-1 mean no
limit)fs..limit.output: (number, 0/-1 mean no
limit)fs..limit.timeout: (milliseconds, 0 means
infinite)fs..limit.stream-timeout: (milliseconds, 0 means
infinite)


Since I configured the above configuration, this problem has begun to
appear! When I removed the above configuration, the problem disappeared.I
think that when flink is configured with file system connection
restrictions, the mechanism for recovering from checkpoint needs to be
improved. Jobs can recover from checkpoints more slowly with file system
connection restrictions, rather than failing directly because of the above
exceptions.

2. After the job has been running for a long time, if the state data stored
in the state backend (such as hdfs) is lost for some reason, what other
ways can quickly restore this state data back quickly, for example, through
some kind of offline task is to quickly recover state data from offline
data, so that streaming jobs can be launched from this recovered state data.

Best
Ben

Ben Yan  于2018年12月8日周六 上午11:08写道:

> I hava already tested it.
>
> [root@node ~]#ll
> /mnt/yarn/local/usercache/yarn/appcache/application_1544101169829_0038/
> total 32
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:29
> blobStore-273cf1a6-0f98-4c86-801e-5d76fef66a58
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:29
> blobStore-992562a5-f42f-43f7-90de-a415b4dcd398
> drwx--x---  4 yarn hadoop 4096 Dec  8 02:29
> container_e73_1544101169829_0038_01_59
> drwx--x--- 13 yarn hadoop 4096 Dec  8 02:29 filecache
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:29
> flink-dist-cache-6d8dab0c-4034-4bbe-a9b9-b524cf6856e3
> drwxr-xr-x  8 yarn hadoop 4096 Dec  8 02:29
> flink-io-6fba8471-4d84-4c13-9e3c-ef3891b366f0
> drwxr-xr-x  4 yarn hadoop 4096 Dec  8 02:29 localState
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:29
> rocksdb-lib-7ef4471db8d3b8c1bdcfa4dba4d95a36
>
> And the derectory "flink-io-6fba8471-4d84-4c13-9e3c-ef3891b366f0" does not
> exist.
>
> [root@node ~]#ll
> /mnt/yarn/local/usercache/yarn/appcache/application_1544101169829_0038/
> total 12
> drwx--x--- 13 yarn hadoop 4096 Dec  8 02:29 filecache
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:53 localState
> drwxr-xr-x  2 yarn hadoop 4096 Dec  8 02:53
> rocksdb-lib-7ef4471db8d3b8c1bdcfa4dba4d95a36
>
> Ben Yan  于2018年12月8日周六 上午12:23写道:
>
>> Thank you for your advice! I will check this out next, and I will sync
>> the information at any time with new progress.
>>
>> Stefan Richter  于2018年12月8日周六 上午12:05写道:
>>
>>> I think then you need to investigate what goes wrong
>>> in RocksDBIncrementalRestoreOperation::restoreInstanceDirectoryFromPath. If
>>> you look at the code it lists the files in a directory and tries to hard
>>> link them into another directory, and I would only expect to see the
>>> mentioned exception if the original file that we try to link does not
>>> exist. However, imo it must exist because we list it in the directory right
>>> before the link attempt and Flink is not delete anything in the meantime.
>>> So the question is, why can a file that was listed before just suddenly
>>> disappear when it is hard linked? The only potential problem could be in
>>> the path transformations and concatenations, but they look good to me and
>>> also pass all tests, including end-to-end tests that do exactly such a
>>> restore. I suggest to either observe the created files and what happens
>>> with the one that is mentioned in the exception or introduce debug logging
>>> in the code, in particular a check if the listed file (the link target)
>>> does exist before linking, which it should in my opinion because it is
>>> listed in the directory.
>>>
>>> On 7. Dec 2018, at 16:33, Ben Yan  wrote:
>>>
>>> The version of the recovered checkpoint is also 1.7.0 .
>>>
>>> Stefan Richter  于2018年12月7日周五 下午11:06写道:
>>>
 Just to clarify, the checkpoint from which you want to resume in 1.7,
 was that taken by 1.6 or by 1.7? So far this is a bit mysterious because it
 says FileNotFound, but the whole iteration is driven by listing the
 existing files. Can you somehow monitor which files and directories are
 created during the restore attempt?

 On 7. Dec 2018, at 15:53, Ben Yan  wrote:

 hi ,Stefan

 Thank you for your explanation. I used flink1.6.2, which is without any
 problems. I have tested it a few times with version 1.7.0, but every time I
 resume from the checkpoint, the job will show the exception I showed
 earlier, which will make the job unrecoverable.And I checked all the logs,
 except for th

Trying to write to parquet file (kafka as a source) yields thousands of "in progress" files

2018-12-09 Thread Avi Levi
Hi,
I am trying to read from kafka and write to parquet. But I am getting
thousands of ".part-0-0in progress..." files (and counting ...)
is that a bug or am I doing something wrong?

object StreamParquet extends App {
  implicit val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(100)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setParallelism(1)
val consumer = new FlinkKafkaConsumer011[Address](SOURCE_TOPIC, new
AddressSchema(), consumerProperties)
  val stream: DataStreamSource[Address] = env.addSource(QueueImpl.consumer)
  val outputPath = "streaming_files"
  val sink = StreamingFileSink.forBulkFormat(
new Path(outputPath),
ParquetAvroWriters.forReflectRecord(classOf[Address])).build()
  stream.addSink(sink)
  env.execute("Write to file")
}


Question regarding rescale api

2018-12-09 Thread 祁明良
Hi All,

I see the rescale api allow us to somehow redistribute element locally, but is 
it possible to make the upstream operator distributed evenly on task managers?
For example I have 10 task managers each with 10 slots. The application reads 
data from Kafka topic with 20 partitions, then rescale it to full parallelism. 
To me it seems that the 20 slots needed to read from Kafka won’t distributed 
evenly on 10 task managers, which means further rescale still needs to shuffle 
data over network.


Best,
Mingliang

本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


Re: Flink Yarn Deployment Issue - 1.7.0

2018-12-09 Thread sohi mankotia
Hi Jorn,

There are no more logs . Attaching yarn aggregated logs for first problem .
For second one job is not even getting submitted.

- Sohi

On Sun, Dec 9, 2018 at 2:13 PM Jörn Franke  wrote:

> Can you check the Flink log files? You should get there a better
> description of the error.
>
> > Am 08.12.2018 um 18:15 schrieb sohimankotia :
> >
> > Hi ,
> >
> > I have installed flink-1.7.0 Hadoop 2.7 scala 2.11 .  We are using
> > hortonworks hadoop distribution.(hdp/2.6.1.0-129/)
> >
> > *Flink lib folder looks like :*
> >
> >
> > -rw-r--r-- 1 hdfs hadoop 93184216 Nov 29 02:15 flink-dist_2.11-1.7.0.jar
> > -rw-r--r-- 1 hdfs hadoop79219 Nov 29 03:33
> > flink-hadoop-compatibility_2.11-1.7.0.jar
> > -rw-r--r-- 1 hdfs hadoop   141881 Nov 29 02:13
> flink-python_2.11-1.7.0.jar
> > -rw-r--r-- 1 hdfs hadoop   489884 Nov 28 23:01 log4j-1.2.17.jar
> > -rw-r--r-- 1 hdfs hadoop 9931 Nov 28 23:01 slf4j-log4j12-1.7.15.j
> >
> > *My code :*
> >
> >   ExecutionEnvironment env =
> > ExecutionEnvironment.getExecutionEnvironment();
> >
> >   String p = args[0];
> >
> >
> >   Job job = Job.getInstance();
> >   SequenceFileInputFormat inputFormat = new
> > SequenceFileInputFormat<>();
> >
> > job.getConfiguration().setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE,
> > true);
> >   final HadoopInputFormat hInputEvents =
> > HadoopInputs.readHadoopFile(inputFormat, Text.class,
> BytesWritable.class, p,
> > job);
> >   org.apache.flink.configuration.Configuration fileReadConfig = new
> > org.apache.flink.configuration.Configuration();
> >
> >   env.createInput(hInputEvents)
> >   .output(new PrintingOutputFormat<>());
> >
> >
> > *pom.xml*
> >
> > flink.version = 1.7.0
> >
> >
> >  org.apache.flink
> >  flink-java
> >  ${flink.version}
> >  provided
> >
> >
> >  org.apache.flink
> >  flink-clients_2.11
> >  ${flink.version}
> >  provided
> >
> >
> >  org.apache.flink
> >  flink-streaming-java_2.11
> >  ${flink.version}
> >  provided
> >
> >
> >
> >  org.apache.flink
> >  flink-hadoop-compatibility_2.11
> >  ${flink.version}
> >  provided
> >
> >
> >
> >  org.apache.flink
> >  flink-shaded-hadoop2
> >  ${flink.version}
> >  provided
> >
> >
> > *
> > in script :*
> >
> >
> >
> > export HADOOP_CONF_DIR=/etc/hadoop/conf
> > export HADOOP_CLASSPATH="/usr/hdp/2.6.1.0-129/hadoop/hadoop-*":`hadoop
> > classpath`
> >
> > echo ${HADOOP_CLASSPATH}
> >
> > PARALLELISM=1
> > JAR_PATH="jar"
> > CLASS_NAME="CLASS_NAME"
> > NODES=1
> > SLOTS=1
> > MEMORY_PER_NODE=2048
> > QUEUE="default"
> > NAME="sample"
> >
> > IN="input-file-path"
> >
> >
> > /home/hdfs/flink-1.7.0/bin/flink run -m yarn-cluster  -yn ${NODES} -yqu
> > ${QUEUE} -ys ${SLOTS} -ytm ${MEMORY_PER_NODE} --parallelism
> ${PARALLELISM}
> > -ynm ${NAME} -c ${CLASS_NAME} ${JAR_PATH} ${IN}
> >
> >
> > *where classpath is printing:*
> >
> >
> /usr/hdp/2.6.1.0-129/hadoop/hadoop-*:/usr/hdp/2.6.1.0-129/hadoop/conf:/usr/hdp/2.6.1.0-129/hadoop/lib/*:/usr/hdp/2.6.1.0-129/hadoop/.//*:/usr/hdp/2.6.1.0-129/hadoop-hdfs/./:/usr/hdp/2.6.1.0-129/hadoop-hdfs/lib/*:/usr/hdp/2.6.1.0-129/hadoop-hdfs/.//*:/usr/hdp/2.6.1.0-129/hadoop-yarn/lib/*:/usr/hdp/2.6.1.0-129/hadoop-yarn/.//*:/usr/hdp/2.6.1.0-129/hadoop-mapreduce/lib/*:/usr/hdp/2.6.1.0-129/hadoop-mapreduce/.//*:/usr/hdp/
> 2.6.1.
> 0-129/hadoop/conf:/usr/hdp/2.6.1.0-129/hadoop/lib/*:/usr/hdp/2.6.1.0-129/hadoop/.//*:/usr/hdp/2.6.1.0-129/hadoop-hdfs/./:/usr/hdp/2.6.1.0-129/hadoop-hdfs/lib/*:/usr/hdp/2.6.1.0-129/hadoop-hdfs/.//*:/usr/hdp/2.6.1.0-129/hadoop-yarn/lib/*:/usr/hdp/2.6.1.0-129/hadoop-yarn/.//*:/usr/hdp/2.6.1.0-129/hadoop-mapreduce/lib/*:/usr/hdp/2.6.1.0-129/hadoop-mapreduce/.//*::mysql-connector-java-5.1.17.jar:mysql-connector-java.jar:/usr/hdp/2.6.1.0-129/tez/*:/usr/hdp/2.6.1.0-129/tez/lib/*:/usr/hdp/2.6.1.0-129/tez/conf:mysql-connector-java-5.1.17.jar:mysql-connector-java.jar:/usr/hdp/2.6.1.0-129/tez/*:/usr/hdp/2.6.1.0-129/tez/lib/*:/usr/hdp/2.6.1.0-129/tez/conf
> >
> >
> > But I am getting class not found error for hadoop related jar . Error is
> > attached .
> >
> >
> > error.txt
> > <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/error.txt>
>
> > *Another Problem :*
> >
> > If i added hadoop shaded jar in lib folder
> >
> >
> > -rw-r--r-- 1 hdfs hadoop 93184216 Nov 29 02:15 flink-dist_2.11-1.7.0.jar
> > -rw-r--r-- 1 hdfs hadoop79219 Nov 29 03:33
> > flink-hadoop-compatibility_2.11-1.7.0.jar
> > -rw-r--r-- 1 hdfs hadoop   141881 Nov 29 02:13
> flink-python_2.11-1.7.0.jar
> > *-rw-r--r-- 1 hdfs hadoop 41130742 Dec  8 22:38
> > flink-shaded-hadoop2-uber-1.7.0.jar*
> > -rw-r--r-- 1 hdfs hadoop   489884 Nov 28 23:01 log4j-1.2.17.jar
> > -rw-r--r-- 1 hdfs hadoop 9931 Nov 28 23:01 slf4j-log4j12-1.7.15.jar
> >
> > I am getting following error. And this is happening for all version
> greater
> > than 1.4.2 .
> >
> > java.lang.IllegalAccess

Re: Flink Yarn Deployment Issue - 1.7.0

2018-12-09 Thread Jörn Franke
Can you check the Flink log files? You should get there a better description of 
the error.

> Am 08.12.2018 um 18:15 schrieb sohimankotia :
> 
> Hi ,
> 
> I have installed flink-1.7.0 Hadoop 2.7 scala 2.11 .  We are using
> hortonworks hadoop distribution.(hdp/2.6.1.0-129/)
> 
> *Flink lib folder looks like :*
> 
> 
> -rw-r--r-- 1 hdfs hadoop 93184216 Nov 29 02:15 flink-dist_2.11-1.7.0.jar
> -rw-r--r-- 1 hdfs hadoop79219 Nov 29 03:33
> flink-hadoop-compatibility_2.11-1.7.0.jar
> -rw-r--r-- 1 hdfs hadoop   141881 Nov 29 02:13 flink-python_2.11-1.7.0.jar
> -rw-r--r-- 1 hdfs hadoop   489884 Nov 28 23:01 log4j-1.2.17.jar
> -rw-r--r-- 1 hdfs hadoop 9931 Nov 28 23:01 slf4j-log4j12-1.7.15.j
> 
> *My code :*
> 
>   ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> 
>   String p = args[0];
> 
> 
>   Job job = Job.getInstance();
>   SequenceFileInputFormat inputFormat = new
> SequenceFileInputFormat<>();
> 
> job.getConfiguration().setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE,
> true);
>   final HadoopInputFormat hInputEvents =
> HadoopInputs.readHadoopFile(inputFormat, Text.class, BytesWritable.class, p,
> job);
>   org.apache.flink.configuration.Configuration fileReadConfig = new
> org.apache.flink.configuration.Configuration();
> 
>   env.createInput(hInputEvents)
>   .output(new PrintingOutputFormat<>());
> 
> 
> *pom.xml*
> 
> flink.version = 1.7.0
> 
>
>  org.apache.flink
>  flink-java
>  ${flink.version}
>  provided
>
>
>  org.apache.flink
>  flink-clients_2.11
>  ${flink.version}
>  provided
>
>
>  org.apache.flink
>  flink-streaming-java_2.11
>  ${flink.version}
>  provided
>
> 
>
>  org.apache.flink
>  flink-hadoop-compatibility_2.11
>  ${flink.version}
>  provided
>
> 
>
>  org.apache.flink
>  flink-shaded-hadoop2
>  ${flink.version}
>  provided
>
> 
> *
> in script :*
> 
> 
> 
> export HADOOP_CONF_DIR=/etc/hadoop/conf
> export HADOOP_CLASSPATH="/usr/hdp/2.6.1.0-129/hadoop/hadoop-*":`hadoop
> classpath`
> 
> echo ${HADOOP_CLASSPATH}
> 
> PARALLELISM=1
> JAR_PATH="jar"
> CLASS_NAME="CLASS_NAME"
> NODES=1
> SLOTS=1
> MEMORY_PER_NODE=2048
> QUEUE="default"
> NAME="sample"
> 
> IN="input-file-path"
> 
> 
> /home/hdfs/flink-1.7.0/bin/flink run -m yarn-cluster  -yn ${NODES} -yqu
> ${QUEUE} -ys ${SLOTS} -ytm ${MEMORY_PER_NODE} --parallelism ${PARALLELISM}
> -ynm ${NAME} -c ${CLASS_NAME} ${JAR_PATH} ${IN} 
> 
> 
> *where classpath is printing:*
> 
> /usr/hdp/2.6.1.0-129/hadoop/hadoop-*:/usr/hdp/2.6.1.0-129/hadoop/conf:/usr/hdp/2.6.1.0-129/hadoop/lib/*:/usr/hdp/2.6.1.0-129/hadoop/.//*:/usr/hdp/2.6.1.0-129/hadoop-hdfs/./:/usr/hdp/2.6.1.0-129/hadoop-hdfs/lib/*:/usr/hdp/2.6.1.0-129/hadoop-hdfs/.//*:/usr/hdp/2.6.1.0-129/hadoop-yarn/lib/*:/usr/hdp/2.6.1.0-129/hadoop-yarn/.//*:/usr/hdp/2.6.1.0-129/hadoop-mapreduce/lib/*:/usr/hdp/2.6.1.0-129/hadoop-mapreduce/.//*:/usr/hdp/2.6.1.0-129/hadoop/conf:/usr/hdp/2.6.1.0-129/hadoop/lib/*:/usr/hdp/2.6.1.0-129/hadoop/.//*:/usr/hdp/2.6.1.0-129/hadoop-hdfs/./:/usr/hdp/2.6.1.0-129/hadoop-hdfs/lib/*:/usr/hdp/2.6.1.0-129/hadoop-hdfs/.//*:/usr/hdp/2.6.1.0-129/hadoop-yarn/lib/*:/usr/hdp/2.6.1.0-129/hadoop-yarn/.//*:/usr/hdp/2.6.1.0-129/hadoop-mapreduce/lib/*:/usr/hdp/2.6.1.0-129/hadoop-mapreduce/.//*::mysql-connector-java-5.1.17.jar:mysql-connector-java.jar:/usr/hdp/2.6.1.0-129/tez/*:/usr/hdp/2.6.1.0-129/tez/lib/*:/usr/hdp/2.6.1.0-129/tez/conf:mysql-connector-java-5.1.17.jar:mysql-connector-java.jar:/usr/hdp/2.6.1.0-129/tez/*:/usr/hdp/2.6.1.0-129/tez/lib/*:/usr/hdp/2.6.1.0-129/tez/conf
> 
> 
> But I am getting class not found error for hadoop related jar . Error is
> attached .
> 
> 
> error.txt
> 
>   
> *Another Problem :*
> 
> If i added hadoop shaded jar in lib folder
> 
> 
> -rw-r--r-- 1 hdfs hadoop 93184216 Nov 29 02:15 flink-dist_2.11-1.7.0.jar
> -rw-r--r-- 1 hdfs hadoop79219 Nov 29 03:33
> flink-hadoop-compatibility_2.11-1.7.0.jar
> -rw-r--r-- 1 hdfs hadoop   141881 Nov 29 02:13 flink-python_2.11-1.7.0.jar
> *-rw-r--r-- 1 hdfs hadoop 41130742 Dec  8 22:38
> flink-shaded-hadoop2-uber-1.7.0.jar*
> -rw-r--r-- 1 hdfs hadoop   489884 Nov 28 23:01 log4j-1.2.17.jar
> -rw-r--r-- 1 hdfs hadoop 9931 Nov 28 23:01 slf4j-log4j12-1.7.15.jar
> 
> I am getting following error. And this is happening for all version greater
> than 1.4.2 .
> 
> java.lang.IllegalAccessError: tried to access method
> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
> from class
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
>at
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
>at
> org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163)
>at