Re: Question on Job Restart strategy

2020-05-26 Thread Gary Yao
Hi Bhaskar,

> Why the reset counter is not zero after streaming job restart is successful?

The short answer is that the fixed delay restart strategy is not
implemented like that (see [1] if you are using Flink 1.10 or above).
There are also other systems that behave similarly, e.g., Apache
Hadoop YARN (see yarn.resourcemanager.am.max-attempts).

If you have such a requirement, you can try to approximate it using
the failure rate restart strategy [2]. Resetting the attempt counter
to zero after a successful restart cannot be easily implemented with
the current RestartBackoffTimeStrategy interface [3]; for this to be
possible, the strategy would need to be informed if a restart was
successful. However, it is not clear what constitutes a successful
restart. For example, is it sufficient that enough TMs/slots could be
acquired to run the job? The job could still fail afterwards due to a
bug in user code. Could it be sufficient to require all tasks to
produce at least one record? I do not think so because the job could
still fail deterministically afterwards due to a particular record.

Best,
Gary

[1] 
https://github.com/apache/flink/blob/d1292b5f30508e155d0f733527532d7c671ad263/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FixedDelayRestartBackoffTimeStrategy.java#L29
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/task_failure_recovery.html#failure-rate-restart-strategy
[3] 
https://github.com/apache/flink/blob/d1292b5f30508e155d0f733527532d7c671ad263/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategy.java#L23


On Tue, May 26, 2020 at 9:28 AM Vijay Bhaskar  wrote:
>
> Hi
> We are using restart strategy of fixed delay.
> I have fundamental question:
> Why the reset counter is not zero after streaming job restart is successful?
> Let's say I have number of restarts max are: 5
> My streaming job tried 2 times and 3'rd attempt its successful, why counter 
> is still 2 but not zero?
> Traditionally in network world, clients will retry for some time and once 
> they are successful, they will reset the counter back to zero.
>
> Why this is the case in flink?
>
> Regards
> Bhaskar


Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-14 Thread Gary Yao
>
> Its because the flink distribution of the cluster is 1.7.2. We use a
> standalone cluster , so in the lib directory in flink the artifact is
> flink-core-1.7.2.jar . I need to pack flink-core-1.9.0.jar from application
> and use child first class loading to use newer version of flink-core.
>

Do you have experience with this technique in production? In general I do
not think this can work; a job pretending to run a newer version of Flink
generally cannot communicate with an older JobManager, which normally does
not even run user code.

If you are stuck with Flink 1.8, maybe it is an option for you to backport
FLINK-11693 to Flink 1.8 yourself and build a custom Kafka connector.

On Tue, May 12, 2020 at 10:04 PM Nick Bendtner  wrote:
>
> Hi Gary,
> Its because the flink distribution of the cluster is 1.7.2. We use a
standalone cluster , so in the lib directory in flink the artifact is
flink-core-1.7.2.jar . I need to pack flink-core-1.9.0.jar from application
and use child first class loading to use newer version of flink-core. If I
have it as provided scope, sure it will work in IntelliJ but not outside of
it .
>
> Best,
> Nick
>
> On Tue, May 12, 2020 at 2:53 PM Gary Yao  wrote:
>>
>> Hi Nick,
>>
>> Can you explain why it is required to package flink-core into your
>> application jar? Usually flink-core is a dependency with provided
>> scope [1]
>>
>> Best,
>> Gary
>>
>> [1]
https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#dependency-scope
>>
>> On Tue, May 12, 2020 at 5:41 PM Nick Bendtner  wrote:
>> >
>> > Hi Gary,
>> > Thanks for the info. I am aware this feature is available in 1.9.0
onwards. Our cluster is still very old and have CICD challenges,I was
hoping not to bloat up the application jar by packaging even flink-core
with it. If its not possible to do this with older version without writing
our own kafka sink implementation similar to the flink provided version in
1.9.0 then I think we will pack flink-core 1.9.0 with the application and
follow the approach that you suggested. Thanks again for getting back to me
so quickly.
>> >
>> > Best,
>> > Nick
>> >
>> > On Tue, May 12, 2020 at 3:37 AM Gary Yao  wrote:
>> >>
>> >> Hi Nick,
>> >>
>> >> Are you able to upgrade to Flink 1.9? Beginning with Flink 1.9 you
can use
>> >> KafkaSerializationSchema to produce a ProducerRecord [1][2].
>> >>
>> >> Best,
>> >> Gary
>> >>
>> >> [1] https://issues.apache.org/jira/browse/FLINK-11693
>> >> [2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.html
>> >>
>> >> On Mon, May 11, 2020 at 10:59 PM Nick Bendtner 
wrote:
>> >> >
>> >> > Hi guys,
>> >> > I use 1.8.0 version for flink-connector-kafka. Do you have any
recommendations on how to produce a ProducerRecord from a kafka sink.
Looking to add support to kafka headers therefore thinking about
ProducerRecord. If you have any thoughts its highly appreciated.
>> >> >
>> >> > Best,
>> >> > Nick.


Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-12 Thread Gary Yao
Hi Nick,

Can you explain why it is required to package flink-core into your
application jar? Usually flink-core is a dependency with provided
scope [1]

Best,
Gary

[1] 
https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#dependency-scope

On Tue, May 12, 2020 at 5:41 PM Nick Bendtner  wrote:
>
> Hi Gary,
> Thanks for the info. I am aware this feature is available in 1.9.0 onwards. 
> Our cluster is still very old and have CICD challenges,I was hoping not to 
> bloat up the application jar by packaging even flink-core with it. If its not 
> possible to do this with older version without writing our own kafka sink 
> implementation similar to the flink provided version in 1.9.0 then I think we 
> will pack flink-core 1.9.0 with the application and follow the approach that 
> you suggested. Thanks again for getting back to me so quickly.
>
> Best,
> Nick
>
> On Tue, May 12, 2020 at 3:37 AM Gary Yao  wrote:
>>
>> Hi Nick,
>>
>> Are you able to upgrade to Flink 1.9? Beginning with Flink 1.9 you can use
>> KafkaSerializationSchema to produce a ProducerRecord [1][2].
>>
>> Best,
>> Gary
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-11693
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.html
>>
>> On Mon, May 11, 2020 at 10:59 PM Nick Bendtner  wrote:
>> >
>> > Hi guys,
>> > I use 1.8.0 version for flink-connector-kafka. Do you have any 
>> > recommendations on how to produce a ProducerRecord from a kafka sink. 
>> > Looking to add support to kafka headers therefore thinking about 
>> > ProducerRecord. If you have any thoughts its highly appreciated.
>> >
>> > Best,
>> > Nick.


Re: Flink Metrics in kubernetes

2020-05-12 Thread Gary Yao
Hi Averell,

If you are seeing the log message from [1] and Scheduled#report() is
not called, the thread in the "Flink-MetricRegistry" thread pool might
be blocked. You can use the jstack utility to see on which task the
thread pool is blocked.

Best,
Gary

[1] 
https://github.com/apache/flink/blob/e346215edcf2252cc60c5cef507ea77ce2ac9aca/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java#L141

On Tue, May 12, 2020 at 4:32 PM Averell  wrote:
>
> Hi,
>
> I'm trying to config Flink running in Kubernetes native to push some metrics
> to NewRelic (using a custom ScheduledDropwizardReporter).
>
> From the logs, I could see that an instance of ScheduledDropwizardReporter
> has already been created successfully (the overridden  getReporter() method
> 
> was called).
> An instance of  MetricRegistryImpl
> 
> also created successfully (this log was shown: /Periodically reporting
> metrics in intervals of 30 SECONDS for reporter my_newrelic_reporter/)
>
> However, the  report() method
> 
> was not called.
>
> When running on my laptop, there's no issue at all.
> Are there any special things that I need to care for when running in
> Kubernetes?
>
> Thanks a lot.
>
> Regards,
> Averell
>
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-12 Thread Gary Yao
Hi Nick,

Are you able to upgrade to Flink 1.9? Beginning with Flink 1.9 you can use
KafkaSerializationSchema to produce a ProducerRecord [1][2].

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-11693
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.html

On Mon, May 11, 2020 at 10:59 PM Nick Bendtner  wrote:
>
> Hi guys,
> I use 1.8.0 version for flink-connector-kafka. Do you have any 
> recommendations on how to produce a ProducerRecord from a kafka sink. Looking 
> to add support to kafka headers therefore thinking about ProducerRecord. If 
> you have any thoughts its highly appreciated.
>
> Best,
> Nick.


Re: Running 2 instances of LocalExecutionEnvironments with same consumer group.id

2020-04-22 Thread Gary Yao
Hi Suraj,

This question has been asked before:


http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-consumers-don-t-honor-group-id-td21054.html

Best,
Gary

On Wed, Apr 22, 2020 at 6:08 PM Suraj Puvvada  wrote:
>
> Hello,
>
> I have two JVMs that run LocalExecutionEnvorinments each using the same 
> consumer group.id.
>
> i noticed that the consumers in each instance has all partitions assigned. I 
> was expecting that the partitions will be split across consumers across the 
> two JVMs
>
> Any help on what might be happening ?
>
> Thanks
> Suraj


Re: Two questions about Async

2020-04-22 Thread Gary Yao
> Bytes Sent but Records Sent is always 0

Sounds like a bug. However, I am unable to reproduce this using the
AsyncIOExample [1]. Can you provide a minimal working example?


> Is there an Async Sink? Or do I just rewrite my Sink as an AsyncFunction
followed by a dummy sink?

You will have to implement your own AsyncFunction to use as a sink. However, the
AsyncFunction operator does not need to be followed by a dummy sink.


> What’s the recommendation if the sink performing blocking I/O is proven to be
the root cause of back pressure?

There are many things that can be done, such as increasing the parallelism or
reducing per-record fixed costs. Note that all sinks are typically dominated by
I/O costs. It is difficult to give a recommendation without knowing details
about your use case and desired consistency guarantees.

Best,
Gary

[1] 
https://github.com/apache/flink/blob/aa4eb8f0c9ce74e6b92c3d9be5dc8e8cb536239d/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java


On Tue, Apr 21, 2020 at 11:06 PM Stephen Connolly
 wrote:
>
> 1. On Flink 1.10 when I look at the topology overview, the AsyncFunctions 
> show non-zero values for Bytes Received; Records Received; Bytes Sent but 
> Records Sent is always 0... yet the next step in the topology shows approx 
> the same Bytes Received as the async sent (modulo minor delays) and a 
> non-zero Records Received. Is the “Records Sent of an AsyncFunction is always 
> displayed as zero” a bug?
>
> 2. Is there an Async Sink? Or do I just rewrite my Sink as an AsyncFunction 
> followed by a dummy sink? What’s the recommendation if the sink performing 
> blocking I/O is proven to be the root cause of back pressure?
>
> Thanks in advance for your help
> --
> Sent from my phone


Re: Run several jobs in parallel in same EMR cluster?

2020-03-30 Thread Gary Yao
Can you try to set config option taskmanager.numberOfTaskSlots to 2? By
default the TMs only offer one slot [1] independent from the number of CPU
cores.

Best,
Gary

[1]
https://github.com/apache/flink/blob/da3082764117841d885f41c645961f8993a331a0/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java#L197-L199

On Mon, Mar 30, 2020 at 1:22 PM Antonio Martínez Carratalá <
amarti...@alto-analytics.com> wrote:

> Hello
>
> I'm running Flink over Amazon EMR and I'm trying to send several different
> batch jobs to the cluster after creating it.
>
> This is my cluster creation code:
> 
> StepConfig copyJarStep = new StepConfig()
> .withName("copy-jar-step")
> .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
> .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
> .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName +
> "/lib/flink-jobs.jar /home/hadoop/flink-jobs.jar"));
>
> List stepConfigs = new ArrayList<>();
> stepConfigs.add(copyJarStep);
>
> Application flink = new Application().withName("Flink");
>
> Configuration flinkConfiguration = new Configuration()
>  .withClassification("flink-conf")
> .addPropertiesEntry("jobmanager.heap.size", "2048m")
> .addPropertiesEntry("taskmanager.heap.size",  "2048m")
>
> RunJobFlowRequest request = new RunJobFlowRequest()
> .withName("cluster-" + executionKey)
> .withReleaseLabel("emr-5.26.0")
> .withApplications(flink)
> .withConfigurations(flinkConfiguration)
> .withServiceRole("EMR_DefaultRole")
> .withJobFlowRole("EMR_EC2_DefaultRole")
> .withLogUri(getWorkPath() + "logs")
> .withInstances(new JobFlowInstancesConfig()
> .withEc2SubnetId("subnetid")
> .withInstanceCount(2) // 1 for task manager + 1 for job manager
> .withKeepJobFlowAliveWhenNoSteps(true)
> .withMasterInstanceType("m4.large")
> .withSlaveInstanceType("m4.large"))
> .withSteps(stepConfigs);
>
> RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
>
> 
>
> And this is how I add the jobs:
>
> -
> StepConfig runJobStep = new StepConfig()
> .withName("run-job-step")
> .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
> .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
> .withArgs("bash", "-c", "flink run -m yarn-cluster"
> + " --parallelism " + parallelism
> + " --class " + jobClass.getCanonicalName()
> + " /home/hadoop/flink-jobs.jar "
> + jobArguments));
>
> AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
> .withJobFlowId(clusterId)
> .withSteps(runJobStep);
>
> AddJobFlowStepsResult result =
> amazonClient.getEmrClient().addJobFlowSteps(request);
>
> -
>
> And these are my jobs:
>
> - Job1 - parallelism 1
> - Job2 - parallelism 1
> - Job3 - parallelism 2
>
> I'm using m4.large machines as slave so I have 2 cores in it, and I was
> expecting that Job1 and Job2 were running in parallel and then Job3 when
> Job1 and Job2 finish, but what I see is that Job2 is waiting (Pending
> status) for Job1 to finish before start. I see only one task manager is
> created for Job1, when finishes another one is created for Job2, and then 2
> are created for Job3
>
> Since I have 2 cores available why is it not running Job2 in the other
> instead of wait? is there any way to configure it?
>
> Thanks
>
>
>
>


Re: Testing RichAsyncFunction with TestHarness

2020-03-30 Thread Gary Yao
>
> Additionally even though I add all necessary dependencies defiend in [1] I
> cannot see ProcessFunctionTestHarnesses class.
>

That class was added in Flink 1.10 [1].

[1]
https://github.com/apache/flink/blame/f765ad09ae2b2aa478c887b988e11e92a8b730bd/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ProcessFunctionTestHarnesses.java

On Fri, Mar 27, 2020 at 10:13 PM KristoffSC 
wrote:

> Hi,
> Im trying to test my RichAsyncFunction implementation with
> OneInputStreamOperatorTestHarness based on [1]. I'm using Flink 1.9.2
>
> My test setup is:
>  this.processFunction = new MyRichAsyncFunction();
> this.testHarness = new OneInputStreamOperatorTestHarness<>(
> new AsyncWaitOperator<>(processFunction, 2000, 1,
> OutputMode.ORDERED));
>
> this.testHarness.open();
>
> I'm having below exception when calling  this.testHarness.open();
>
> java.lang.NullPointerException
> at java.base/java.util.Objects.requireNonNull(Objects.java:221)
> at
>
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.(StreamElementSerializer.java:64)
> at
>
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:142)
> at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:287)
> at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:275)
> at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:393)
> at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:300)
> at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:308)
> at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:483)
>
>
> I will appreciate help with this one.
>
> Additionally even though I add all necessary dependencies defiend in [1] I
> cannot see ProcessFunctionTestHarnesses class.
>
> Thanks.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Automatically Clearing Temporary Directories

2020-03-12 Thread Gary Yao
Hi David,

> Would it be safe to automatically clear the temporary storage every time
when a TaskManager is started?
> (Note: the temporary volumes in use are dedicated to the TaskManager and
not shared :-)
Yes, it is safe in your case.

Best,
Gary

On Tue, Mar 10, 2020 at 6:39 PM David Maddison 
wrote:

> Hi,
>
> When a TaskManager is restarted it can leave behind unreferenced
> BlobServer cache directories in the temporary storage that never get
> cleaned up.  Would it be safe to automatically clear the temporary storage
> every time when a TaskManager is started?
>
> (Note: the temporary volumes in use are dedicated to the TaskManager and
> not shared :-)
>
> Thanks in advance,
>
> David.
>


Re: Failure detection and Heartbeats

2020-03-11 Thread Gary Yao
Hi Morgan,

> I am interested in knowing more about the failure detection mechanism
used by Flink, unfortunately information is a little thin on the ground and
I was hoping someone could shed a little light on the topic.
It is probably best to look into the implementation (see my answers below).

> Having the heartbeat interval shorter than the heartbeat timeout would
mean that multiple requests can be underway at the same time.
Yes, in fact the heartbeat interval must be shorter than the timeout or
else an exception is thrown [1]

> - In the worst case the JobManager would detect the failure in the
longest time, i.e. 60 seconds +- (node fails just after sending the last
heartbeat response)
If a heartbeat response is received, the 50s timeout is reset [2]. If we do
not receive a single heartbeat response for 50s, we will assume a failure
[3]. Therefore, I do not think that there is a worst case or best case here.

Lastly I wanted to mention that since FLIP-6 [4], the responsibilities of
the JobManager have been split. We now have a ResourceManager and one
JobManager for every job (note that in the code the class is called
JobMaster). Each instance employs heartbeating to each other and also to
the TaskManagers.

Best,
Gary

[1]
https://github.com/apache/flink/blob/bf1195232a49cce1897c1fa86c5af9ee005212c6/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java#L43
[2]
https://github.com/apache/flink/blob/1b628d4a7d92f9c79c31f3fe90911940e0676b22/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatMonitorImpl.java#L117-L128
[3]
https://github.com/apache/flink/blob/1b628d4a7d92f9c79c31f3fe90911940e0676b22/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatMonitorImpl.java#L106-L111
[4]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077

On Tue, Mar 10, 2020 at 2:54 PM Morgan Geldenhuys <
morgan.geldenh...@tu-berlin.de> wrote:

> Hi community,
>
> I am interested in knowing more about the failure detection mechanism used
> by Flink, unfortunately information is a little thin on the ground and I
> was hoping someone could shed a little light on the topic.
>
> Looking at the documentation (
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html),
> there are these two configuration options:
>
> heartbeat.interval 1 Long Time interval for requesting heartbeat from
> sender side.
> heartbeat.timeout 5 Long Timeout for requesting and receiving
> heartbeat for both sender and receiver sides. This would indicate Flink
> uses a heartbeat mechanism to ascertain the liveness of TaskManagers. From
> this the following assumptions are made:
>
> The JobManager is responsible for broadcasting a heartbeat requests to all
> TaskManagers and awaits responses.
> If a response is not forthcoming from any particular node within the
> heartbeat timeout period, e.g. 50 seconds by default, then that node is
> timed out and assumed to have failed.
> The heartbeat interval indicated how often the heartbeat request broadcast
> is scheduled.
> Having the heartbeat interval shorter than the heartbeat timeout would
> mean that multiple requests can be underway at the same time.
> Therefore, the TaskManager would need to fail to respond to 4 requests
> (assuming normal response times are lower than 10 seconds) before being
> timed out after 50 seconds.
>
> So therefore if a failure were to occur (considering the default settings):
> - In the best case the JobManager would detect the failure in the shortest
> time, i.e. 50 seconds +- (node fails just before receiving the next
> heartbeat request)
> - In the worst case the JobManager would detect the failure in the longest
> time, i.e. 60 seconds +- (node fails just after sending the last heartbeat
> response)
>
> Is this correct?
>
> For JobManagers in HA mode, this is left to ZooKeeper timeouts which then
> initiates a round of elections and the new leader picks up from the
> previous checkpoint.
>
> Thank you in advance.
>
> Regards,
> M.
>
>
>
>
>
>
>
>


Re: java.util.concurrent.ExecutionException

2020-03-03 Thread Gary Yao
t;> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
>>
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>>
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>>
>> at
>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>>
>> ... 19 more
>>
>> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
>> by NoRestartBackoffTimeStrategy
>>
>> at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>>
>> at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>>
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>>
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>>
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>>
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>>
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>>
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>>
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> Caused by:
>> org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException:
>> Timeout of 6ms expired before the position for partition edges-0 could
>> be determined
>>
>> On Tue, Mar 3, 2020 at 8:03 AM Gary Yao  wrote:
>>
>>> Hi,
>>>
>>> Can you post the complete stacktrace?
>>>
>>> Best,
>>> Gary
>>>
>>> On Tue, Mar 3, 2020 at 1:08 PM kant kodali  wrote:
>>>
&

Re: java.util.concurrent.ExecutionException

2020-03-03 Thread Gary Yao
Hi,

Can you post the complete stacktrace?

Best,
Gary

On Tue, Mar 3, 2020 at 1:08 PM kant kodali  wrote:

> Hi All,
>
> I am just trying to read edges which has the following format in Kafka
>
> 1,2
> 1,3
> 1,5
>
> using the Table API and then converting to DataStream of Edge Objects and
> printing them. However I am getting
> java.util.concurrent.ExecutionException but not sure why?
>
> Here is the sample code
>
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.graph.Edge;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.*;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.descriptors.Csv;
> import org.apache.flink.table.descriptors.Kafka;
> import org.apache.flink.table.descriptors.Schema;
> import org.apache.flink.types.NullValue;
> import org.apache.flink.types.Row;
>
> import java.util.UUID;
>
> public class Test {
>
> public static void main(String... args) throws Exception {
>
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStateBackend((StateBackend) new 
> RocksDBStateBackend("file:///tmp/rocksdb"));
>
> StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(env, bsSettings);
>
> bsTableEnv.connect(
> new Kafka()
> .property("bootstrap.servers", "localhost:9092")
> .property("zookeeper.connect", "localhost:2181")
> .property("group.id", UUID.randomUUID().toString())
> .startFromEarliest()
> .version("universal")
> .topic("edges")
> )
> .withFormat(new Csv().fieldDelimiter(','))
> .withSchema(
> new Schema()
> .field("source", DataTypes.BIGINT())
> .field("target", DataTypes.BIGINT())
> )
> .createTemporaryTable("kafka_source");
>
> Table kafkaSourceTable = bsTableEnv.sqlQuery("select * from 
> kafka_source");
>
> TypeInformation> edgeTypeInformation = 
> TypeInformation.of(new TypeHint>() {
> @Override
> public TypeInformation> getTypeInfo() {
> return super.getTypeInfo();
> }
> });
>
> DataStream> edges = 
> bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
> .map(row -> new Edge<>((Long) row.getField(0), (Long) 
> row.getField(1), NullValue.getInstance()))
> .returns(edgeTypeInformation);
>
> edges.print();
>
> bsTableEnv.execute("sample job");
> }
> }
>
>
>
>


Re: Alink and Flink ML

2020-03-03 Thread Gary Yao
Hi Flavio,

I am looping in Becket (cc'ed) who might be able to answer your question.

Best,
Gary

On Tue, Mar 3, 2020 at 12:19 PM Flavio Pompermaier 
wrote:

> Hi to all,
> since Alink has been open sourced, is there any good reason to keep both
> Flink ML and Alink?
> From what I understood Alink already contains the best ML implementation
> available for Flink..am I wrong?
> Maybe it could make sense to replace the current Flink ML with that of
> Alink..or is that impossible?
>
> Cheers,
> Flavio
>


Re: Unable to recover from savepoint and checkpoint

2020-03-03 Thread Gary Yao
Hi Puneet,

Can you describe how you validated that the state is not restored properly?
Specifically, how did you introduce faults to the cluster?

Best,
Gary

On Tue, Mar 3, 2020 at 11:08 AM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Sorry for the missed information
>
> On recovery the value is coming as false instead of true, state.backend
> has been configured in flink-conf.yaml  along the
> the path for checkpointing and savepoint.
>
> On Tue, Mar 3, 2020 at 3:34 PM Puneet Kinra <
> puneet.ki...@customercentria.com> wrote:
>
>> Hi
>>
>> Stuck with the simple program regarding the checkpointing Flink version I
>> am using 1.10.0
>>
>> *Here I have created DummySource for testing*
>>
>> *DummySource*
>> package com.nudge.stateful;
>>
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>
>> public class BeaconSource implements SourceFunction>{
>>
>> /**
>> *
>> */
>> private static final long serialVersionUID = 1L;
>> private Boolean isRunning=true;
>>
>>
>> public BeaconSource() {
>> super();
>> // TODO Auto-generated constructor stub
>> }
>>
>>
>>
>> public void cancel() {
>> // TODO Auto-generated method stub
>>
>> this.isRunning=false;
>>
>> }
>>
>> public void run(SourceContext> arg0) throws Exception
>> {
>> // TODO Auto-generated method stub
>> while(isRunning) {
>> Thread.sleep(3L);
>> arg0.collect(new Tuple2(10L,"AMQSource"));
>> }
>> }
>>
>> }
>>
>>
>>
>> ---
>> *KeyedProcessFunction (to register the timer and update the status to
>> true so that only one-time trigger should)*
>>
>>
>> package com.nudge.stateful;
>>
>> import org.apache.flink.api.common.functions.IterationRuntimeContext;
>> import org.apache.flink.api.common.functions.RuntimeContext;
>> import org.apache.flink.api.common.state.ListState;
>> import org.apache.flink.api.common.state.ListStateDescriptor;
>> import org.apache.flink.api.common.state.ValueState;
>> import org.apache.flink.api.common.state.ValueStateDescriptor;
>> import org.apache.flink.api.common.typeinfo.TypeHint;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.java.tuple.Tuple;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.tuple.Tuple3;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.runtime.state.FunctionInitializationContext;
>> import org.apache.flink.runtime.state.FunctionSnapshotContext;
>> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
>> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>> import org.apache.flink.util.Collector;
>>
>> import com.google.gson.JsonObject;
>> import com.google.gson.JsonParser;
>>
>> import scala.collection.mutable.LinkedHashMap;
>>
>>
>>
>> import java.util.HashMap;
>> import java.util.Map;
>> import java.util.Map.Entry;
>> import java.util.Set;
>>
>> public class TimeProcessTrigger extends
>> KeyedProcessFunction,String>{
>>
>> /**
>> *
>> */
>> private static final long serialVersionUID = 1L;
>> /**
>> *
>> */
>>
>> private transient ValueState contacthistory;
>> private static final  Long  ONE_MINUTE=6L;
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> @Override
>> public void onTimer(long timestamp, KeyedProcessFunction> Tuple2, String>.OnTimerContext ctx,
>> Collector out) throws Exception {
>> // TODO Auto-generated method stub
>> super.onTimer(timestamp, ctx, out);
>> System.out.println("Timer has fired for the key"+ctx.getCurrentKey());
>> }
>>
>>
>>
>>
>>
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> // TODO Auto-generated method stub
>> super.open(parameters);
>>
>>
>> ValueStateDescriptor descriptor = new
>> ValueStateDescriptor(
>> "contact-history", // the state name
>> Boolean.class); // type information
>>
>> this.contacthistory=getRuntimeContext().getState(descriptor);
>> }
>>
>>
>>
>>
>>
>>
>> @Override
>> public void processElement(Tuple2 input,
>> KeyedProcessFunction, String>.Context ctx,
>> Collector collect)
>> throws Exception {
>> // TODO Auto-generated method stub
>>
>>
>> System.out.println(this.contacthistory.value());
>> Boolean value = this.contacthistory.value();
>> if(value==null) {
>> Long currentTime = ctx.timerService().currentProcessingTime();
>> Long regTimer=currentTime+ONE_MINUTE;
>> System.out.println("Updating the flag and registering the timer
>> @:"+regTimer);
>> this.contacthistory.update(true);
>> ctx.timerService().registerProcessingTimeTimer(regTimer);
>>
>> }else {
>> System.out.println("Timer has already register for this key");
>> }
>> }
>>
>> }
>>
>>
>> -
>> *Main App*
>>
>> package com.nudge.stateful;
>>
>> import org.apache.flink.api.java.functions.KeySelector;
>> import 

Re: Operator latency metric not working in 1.9.1

2020-03-03 Thread Gary Yao
Hi,

There is a release note for Flink 1.7 that could be relevant for you [1]

Granularity of latency metrics
The default granularity for latency metrics has been modified. To
restore the previous behavior users have to explicitly set the granularity
to subtask.

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.7.html#granularity-of-latency-metrics

On Tue, Mar 3, 2020 at 10:14 AM orips  wrote:

> Thanks for the response.
>
> In 1.5 the docs also state that it should be enabled [1], however, it
> always
> worked without setting latencyTrackingInterval
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#latency-tracking
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: yarn session: one JVM per task

2020-02-25 Thread Gary Yao
Hi David,

Before with the both n and -s it was not the case.
>

What do you mean by before? At least in 1.8 "-s" could be used to specify
the
number of slots per TM.


how can I be sure that my Sink that uses this lib is in one JVM ?
>

Is it enough that no other parallel instance of your sink runs in the same
JVM? If that is the case, it is enough to start your your YARN session with:

./bin/yarn-session.sh -s 1 [...]

This will result in exactly one slot per TM. Note that a single slot may
still
hold several subtasks of the job (Slot Sharing) but never two parallel
instances of your sink [2]. You can also control Slot Sharing manually [3].


So, if I understand I have to keep this Flink release (1.9.2) ?
>

I don't see why 1.10.0 would not work for you.


Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/yarn_setup.html#start-a-session
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html#task-slots-and-resources
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/#task-chaining-and-resource-groups

On Tue, Feb 25, 2020 at 10:28 AM David Morin 
wrote:

> Hi Xintong,
>
> At the moment I'm using the 1.9.2 with this command:
>yarn-session.sh -d *-s 1* -jm 4096 -tm 4096 -qu "XXX" -nm "MyPipeline"
> So, after a lot of tests, I've noticed that if I increase the parallelism
> of my Custom Sink, each task is embedded into one TS and, the most
> important, each one into one TaskManager (Yarn container in fact).
> So, if I understand I have to keep this Flink release (1.9.2) ?
>
> Thanks
> David
>
>
>
> Le mar. 25 févr. 2020 à 02:02, Xintong Song  a
> écrit :
>
>> Depending on your Flink version, the '-n' option might not take effect.
>> It is removed in the latest release, but before that there were a few
>> versions where this option is neither removed nor taking effect.
>>
>> Anyway, as long as you have multiple containers, I don't think there's a
>> way to make some of the tasks scheduled to the same JVM. Not that I'm aware
>> of.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Mon, Feb 24, 2020 at 8:43 PM David Morin 
>> wrote:
>>
>>> Hi,
>>>
>>> Thanks Xintong.
>>> I've noticed than when I use yarn-session.sh with --slots (-s) parameter
>>> but without --container (-n) it creates one task/slot per taskmanager.
>>> Before with the both n and -s it was not the case.
>>> I prefer to use only small container with only one task to scale my
>>> pipeline and of course to prevent from thread-safe issue
>>> Do you think I cannot be confident on that behaviour ?
>>>
>>> Regards,
>>> David
>>>
>>> On 2020/02/22 17:11:25, David Morin  wrote:
>>> > Hi,
>>> > My app is based on a lib that is not thread safe (yet...).
>>> > In waiting of the patch has been pushed, how can I be sure that my
>>> Sink that uses this lib is in one JVM ?
>>> > Context: I use one Yarn session and send my Flink jobs to this session
>>> >
>>> > Regards,
>>> > David
>>> >
>>>
>>


Re: REST rescale with Flink on YARN

2020-01-28 Thread Gary Yao
Hi,

You can use

yarn application -status 

to find the host and port that the server is listening on (AM host & RPC
Port). If you need to access that information programmatically, take a look
at
the YarnClient [1].

Best,
Gary


[1]
https://hadoop.apache.org/docs/r2.8.5/api/org/apache/hadoop/yarn/client/api/YarnClient.html

On Thu, Jan 23, 2020 at 3:21 PM Vasily Melnik <
vasily.mel...@glowbyteconsulting.com> wrote:

> Hi all,
> I've found some solution for this issue.
> Problem is that with YARN ApplicationMaster URL we communicate with
> JobManager via proxy which is implemented on Jetty 6 (for Hadoop 2.6).
> So to use PATCH method we need to locate original JobManager URL.
> Using /jobmanager/config API we could get only host, but web.port is
> displayed as 0 (???)
> To find actual web port, we should parse YARN logs for jobmanager, where
> we can find something like this:
>
> *INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint - Rest
> endpoint listening at :.*
>
> Maybe  someone knows less complicated way to find actual REST URL under
> YARN?
>
>
>
>
> С уважением,
> Василий Мельник
>
>
> On Thu, 23 Jan 2020 at 15:32, Chesnay Schepler  wrote:
>
>> Older versions of Jetty don't support PATCH requests. You will either
>> have to update it or create a custom Flink version that uses POST for the
>> rescale operation.
>>
>> On 23/01/2020 13:23, Vasily Melnik wrote:
>>
>> Hi all.
>> I'm using Flink 1.8 on YARN with CDH 5.12
>> When i try to perform rescale request:
>>
>> curl -v -X PATCH 
>> '/proxy/application_1576854986116_0079/jobs/11dcfc3163936fc019e049fc841b075b/rescaling?parallelism=3
>>  
>> '
>>
>> i get a mistake:
>>
>> *Method PATCH is not defined in RFC 2068 and is not supported by the
>> Servlet API *GET and POST methods work well.
>> The Server type in response is Jetty(6.1.26.cloudera.4).
>>
>> How can i deal with this situation?
>>
>> С уважением,
>> Василий Мельник
>>
>>
>>


Re: Localenvironment jobcluster ha High availability

2019-12-11 Thread Gary Yao
Hi Eric,

What you say should be possible because your job will be executed in a
MiniCluster [1] which has HA support. I have not tried this out myself,
and I am not aware that people are doing this in production. However,
there are integration tests that use MiniCluster + ZooKeeper [2].

Best,
Gary

[1]
https://github.com/apache/flink/blob/eddad99123525211c900102206384dacaf8385fc/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
[2]
https://github.com/apache/flink/blob/eddad99123525211c900102206384dacaf8385fc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java

On Tue, Dec 3, 2019 at 7:21 PM Eric HOFFMANN <
eric.hoffman...@thalesdigital.io> wrote:

> Hi, i use a jobcluster (1 manager and 1 worker) in kubernetes for
> streaming application, i would like to have the lightest possible solution,
> is it possible to use a localenvironment (manager and worker embeded) and
> still have HA with zookeeper in this mode?, I mean kubernetes will restart
> the job, in the case of jobcluster, metadata are retrieve from zookeeper
> and data from S3 or hdfs, is this pattern the same in localenvironment ?
> Thx
> Eric
>
> This message contains confidential information and is intended only for
> the individual(s) addressed in the message. If you are not the named
> addressee, you should not disseminate, distribute, or copy this e-mail. If
> you are not the intended recipient, you are notified that disclosing,
> distributing, or copying this e-mail is strictly prohibited.
>


[ANNOUNCE] Progress of Apache Flink 1.10 #2

2019-11-01 Thread Gary Yao
Hi community,

Because we have approximately one month of development time left until the
targeted Flink 1.10 feature freeze, we thought now would be a good time to
give another progress update. Below we have included a list of the ongoing
efforts that have made progress since our last release progress update [1].
As
always, if you are working on something that is not included here, feel free
to use this thread to share your progress.

- Support Java 11 [2]
- Implementation is in progress (18/21 subtasks resolved)

- Table API improvements
- Full Data Type Support in Planner [3]
- Implementing (1/8 subtasks resolved)
- FLIP-66 Support Time Attribute in SQL DDL [4]
- Implementation is in progress (1/7 subtasks resolved).
- FLIP-70 Support Computed Column [5]
- FLIP voting [6]
- FLIP-63 Rework Table Partition Support [7]
- Implementation is in progress (3/15 subtasks resolved).
- FLIP-51 Rework of Expression Design [8]
- Implementation is in progress (2/12 subtasks resolved).
- FLIP-64 Support for Temporary Objects in Table Module [9]
- Implementation is in progress

- Hive compatibility completion (DDL/UDF) to support full Hive integration
- FLIP-57 Rework FunctionCatalog [10]
- Implementation is in progress (6/9 subtasks resolved)
- FLIP-68 Extend Core Table System with Modular Plugins [11]
- Implementation is in progress (2/8 subtasks resolved)

- Finer grained resource management
- FLIP-49: Unified Memory Configuration for TaskExecutors [12]
- Implementation is in progress (6/10 subtasks resolved)
- FLIP-53: Fine Grained Operator Resource Management [13]
- Implementation is in progress (1/9 subtasks resolved)

- Finish scheduler re-architecture [14]
- Integration tests are being enabled for new scheduler

- Executor/Client refactoring [15]
- FLIP-81: Executor-related new ConfigOptions [16]
- done
- FLIP-73: Introducing Executors for job submission [17]
- Implementation is in progress

- FLIP-36 Support Interactive Programming [18]
- Is built on top of FLIP-67 [19], which has been accepted
- Implementation in progress

- FLIP-58: Flink Python User-Defined Stateless Function for Table [20]
- Implementation is in progress (12/22 subtask resolved)
- FLIP-50: Spill-able Heap Keyed State Backend [21]
- Implementation is in progress (2/11 subtasks resolved)

- RocksDB Backend Memory Control [22]
- FLIP for resource management on state backend will be opened soon
- Write Buffer Manager will be backported to FRocksDB due to
performance regression [23] in new RocksDB versions

- Unaligned Checkpoints
- FLIP-76 [24] was published and received positive feedback
- Implementation is in progress

- Separate framework and user class loader in per-job mode [25]
- First PR is almost done. Remaining PRs will be ready next week

- Active Kubernetes Integration [26]
- Implementation is in progress (6/11 in review, 3/11 in progress, 2/11
todo)

- FLIP-39 Flink ML pipeline and ML libs [27]
- A few abstract ML classes have been merged (FLINK-13339, FLINK-13513)
- Starting review of algorithms

Again, the feature freeze is targeted to be at the end of November. Please
make sure that all important work threads can be completed until that date.
Feel free to use this thread to communicate any concerns about features that
might not be finished until then. We will send another announcement later in
the release cycle to make the date of the feature freeze official.

Best,
Yu & Gary

[1] https://s.apache.org/wc0dc
[2] https://issues.apache.org/jira/browse/FLINK-10725
[3] https://issues.apache.org/jira/browse/FLINK-14079
[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+time+attribute+in+SQL+DDL
[5]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-70%3A+Flink+SQL+Computed+Column+Design
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-70-Flink-SQL-Computed-Column-Design-td34385.html
[7]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
[8]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-51%3A+Rework+of+the+Expression+Design
[9]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module
[10]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-57%3A+Rework+FunctionCatalog
[11]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-68%3A+Extend+Core+Table+System+with+Pluggable+Modules
[12]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
[13]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
[14] https://issues.apache.org/jira/browse/FLINK-10429
[15]
https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
[16]

Re: Multiple Job Managers in Flink HA Setup

2019-09-25 Thread Gary Yao
Hi Steve,

> I also tried attaching a shared NFS folder between the two machines and
> tried to set their web.tmpdir property to the shared folder, however it
> appears that each job manager creates a seperate job inside that
directory.

You can create a fixed upload directory via the config option
'web.upload.dir'
[1]. To avoid race conditions, it is probably best to make sure that the
directory already exists before starting the JMs (if the path does not
exist,
both JMs may attempt to create it).

Alternatively you can try one of the following:
- Do not use stand-by masters
- Find the leader address from ZooKeeper, and issue a request directly [2]
- Use Flink CLI, which will resolve the leading JM from ZooKeeper. Note that
 the CLI submits the job by uploading a serialized JobGraph [2][3][4][5]
(you
 could also rebuild that part of the CLI if you need programmatic job
 submission).

Lastly, I want to point out that the programmatic job submission is
currently
being reworked (see [6] for details).

> 2) provide a persistent storage directory for the Jar file so I can
perform
> rescaling without needing to re-upload the jar file.

Can you describe how are you rescaling?


Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#web-upload-dir
[2]
https://github.com/apache/flink/blob/b6e32a317ec273ee3f3085728a02bc3922c22db6/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java#L162
[3]
https://github.com/apache/flink/blob/b6e32a317ec273ee3f3085728a02bc3922c22db6/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L215
[4]
https://github.com/apache/flink/blob/b6e32a317ec273ee3f3085728a02bc3922c22db6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java#L79
[5]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-1
[6]
https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E

On Fri, Sep 20, 2019 at 10:57 PM Steven Nelson 
wrote:

> Hello!
>
> I am having some difficulty with multiple job managers in an HA setup
> using Flink 1.9.0.
>
> I have 2 job managers and have setup the HA setup with the following config
>
> high-availability: zookeeper
> high-availability.cluster-id: /imet-enhance
> high-availability.storageDir: hdfs:///flink/ha/
> high-availability.zookeeper.quorum:
> flink-state-hdfs-zookeeper-1.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181,flink-state-hdfs-zookeeper-2.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181,flink-state-hdfs-zookeeper-0.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181
> high-availability.zookeeper.path.root: /flink
> high-availability.jobmanager.port: 5-50025
>
> I have the job managers behind a load balancer inside a kubernetes cluster
>
> They work great except for one thing. When I use the website (or API) to
> upload the Jar file and start the job sometimes the request goes to a
> different job manager, which doesn't have the jar file in it's temporary
> directory, so it fails to start.
>
> In the 1.7 version of this setup the second Job Manager would return a
> Redirect request. I put an HAProxy in front of it that only allowed traffic
> to flow to the Job Manager that wasn't returning a 300 and this worked well
> for everything. In 1.9 it appears that both Job Managers are able to
> respond (via the internal proxy mechanism I have seen in prior emails).
> However it appears the web file cache is still shared.
>
> I also tried attaching a shared NFS folder between the two machines and
> tried to set their web.tmpdir property to the shared folder, however it
> appears that each job manager creates a seperate job inside that directory.
>
> My end goals are:
> 1) Provide a fault tolerant Flink Cluster
> 2) provide a persistent storage directory for the Jar file so I can
> perform rescaling without needing to re-upload the jar file.
>
> Thoughts?
> -Steve
>


Re: error in Elasticsearch

2019-09-11 Thread Gary Yao
Program arguments should be set to "--input /home/alaa/nycTaxiRides.gz"
(without the quotes).

On Wed, Sep 11, 2019 at 10:39 AM alaa  wrote:

>  Hallo
>
> I put arguments but the same error appear .. what should i do ?
>
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1965/Screenshot_from_2019-09-11_10-34-42.png>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: error in Elasticsearch

2019-09-11 Thread Gary Yao
Hi,

You are not supposed to change that part of the exercise code. You have to
pass the path to the input file as a program argument (e.g., --input
/path/to/file). See [1] and [2] on how to configure program arguments in
IntelliJ.

Best,
Gary

[1]
https://www.jetbrains.com/help/idea/run-debug-configuration-application.html#1
[2]
https://stackoverflow.com/questions/2066307/how-do-you-input-commandline-argument-in-intellij-idea

On Fri, Sep 6, 2019 at 1:41 PM alaa  wrote:

> *Hallo
>
> I try to implement this example to write the results of Popular Places into
> an Elasticsearch index.
>
> But when I run a code .. there was some Error appear *
>
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1965/Screenshot_from_2019-09-06_13-29-15.png>
>
>
>
> *
> and when i set the path-to-input-file .. also there was an error appear
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1965/Screenshot_from_2019-09-06_13-36-10.png>
>
> *
>
> *Can you help me which parameter should i put in this Line
>
> String input = params.getRequired("input");*
>
>
> Thank you
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: How do I start a Flink application on my Flink+Mesos cluster?

2019-09-11 Thread Gary Yao
Hi Felipe,

I am glad that you were able to fix the problem yourself.

> But I suppose that Mesos will allocate Slots and Task Managers
dynamically.
> Is that right?

Yes, that is the case since Flink 1.5 [1].

> Then I put the number of "mesos.resourcemanager.tasks.cpus" to be equal or
> less the available cores on a single node of the cluster. I am not sure
about
> this parameter, but only after this configuration it worked.

I would need to see JobManager and Mesos logs to understand why this
resolved
your issue. If you do not set mesos.resourcemanager.tasks.cpus explicitly,
Flink will request CPU resources equal to the number of TaskManager slots
(taskmanager.numberOfTaskSlots) [2]. Maybe this value was too high in your
configuration?

Best,
Gary


[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
[2]
https://github.com/apache/flink/blob/0a405251b297109fde1f9a155eff14be4d943887/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java#L344

On Tue, Sep 10, 2019 at 10:41 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> I managed to find what was going wrong. I will write here just for the
> record.
>
> First, the master machine was not login automatically at itself. So I had
> to give permission for it.
>
> chmod og-wx ~/.ssh/authorized_keys
> chmod 750 $HOME
>
> Then I put the number of "mesos.resourcemanager.tasks.cpus" to be equal or
> less the available cores on a single node of the cluster. I am not sure
> about this parameter, but only after this configuration it worked.
>
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Fri, Sep 6, 2019 at 10:36 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi,
>>
>> I am running Mesos without DC/OS [1] and Flink on it. Whe I start my
>> cluster I receive some messages that I suppose everything was started.
>> However, I see 0 slats available on the Flink web dashboard. But I suppose
>> that Mesos will allocate Slots and Task Managers dynamically. Is that right?
>>
>> $ ./bin/mesos-appmaster.sh &
>> [1] 16723
>> flink@r03:~/flink-1.9.0$ I0906 10:22:45.080328 16943 sched.cpp:239]
>> Version: 1.9.0
>> I0906 10:22:45.082672 16996 sched.cpp:343] New master detected at
>> mas...@xxx.xxx.xxx.xxx:5050
>> I0906 10:22:45.083276 16996 sched.cpp:363] No credentials provided.
>> Attempting to register without authentication
>> I0906 10:22:45.086840 16997 sched.cpp:751] Framework registered with
>> 22f6a553-e8ac-42d4-9a90-96a8d5f002f0-0003
>>
>> Then I deploy my Flink application. When I use the first command to
>> deploy the application starts. However, the tasks remain CREATED until
>> Flink throws a timeout exception. In other words, it never turns to RUNNING.
>> When I use the second comman to deploy the application it does not start
>> and I receive the exception of "Could not allocate all requires slots
>> within timeout of 30 ms. Slots required: 2". The full stacktrace is
>> below.
>>
>> $ /home/flink/flink-1.9.0/bin/flink run
>> /home/flink/hello-flink-mesos/target/hello-flink-mesos.jar &
>> $ ./bin/mesos-appmaster-job.sh run
>> /home/flink/hello-flink-mesos/target/hello-flink-mesos.jar &
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/mesos.html#mesos-without-dcos
>> ps.: my application runs normally on a standalone Flink cluster.
>>
>> 
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: Job failed.
>> (JobID: 7ad8d71faaceb1ac469353452c43dc2a)
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>> at
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>> at org.hello_flink_mesos.App.(App.java:35)
>> at org.hello_flink_mesos.App.main(App.java:285)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>> at
>> 

Re: [ANNOUNCE] Andrey Zagrebin becomes a Flink committer

2019-08-14 Thread Gary Yao
Congratulations Andrey, well deserved!

Best,
Gary

On Thu, Aug 15, 2019 at 7:50 AM Bowen Li  wrote:

> Congratulations Andrey!
>
> On Wed, Aug 14, 2019 at 10:18 PM Rong Rong  wrote:
>
>> Congratulations Andrey!
>>
>> On Wed, Aug 14, 2019 at 10:14 PM chaojianok  wrote:
>>
>> > Congratulations Andrey!
>> > At 2019-08-14 21:26:37, "Till Rohrmann"  wrote:
>> > >Hi everyone,
>> > >
>> > >I'm very happy to announce that Andrey Zagrebin accepted the offer of
>> the
>> > >Flink PMC to become a committer of the Flink project.
>> > >
>> > >Andrey has been an active community member for more than 15 months. He
>> has
>> > >helped shaping numerous features such as State TTL, FRocksDB release,
>> > >Shuffle service abstraction, FLIP-1, result partition management and
>> > >various fixes/improvements. He's also frequently helping out on the
>> > >user@f.a.o mailing lists.
>> > >
>> > >Congratulations Andrey!
>> > >
>> > >Best, Till
>> > >(on behalf of the Flink PMC)
>> >
>>
>


Re: Queryable State race condition or serialization errors?

2019-05-21 Thread Gary Yao
Hi Burgess Chen,

If you are using MemoryStateBackend or FsStateBackend, you can observe race
conditions on the state objects. However, the RocksDBStateBackend should be
safe from these issues [1].

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/queryable_state.html

On Tue, May 21, 2019 at 5:06 AM burgesschen  wrote:

> Hi Guys,
>
> I observed some strange behaviors while using Queryable state with Flink
> 1.6.2. Here is the story:
>
> My state is of type MapState[String, Map[String, String]]. the inner map is
> frequently updated. Upon querying, sometimes the returned inner map can
> miss
> some fields. What's more, sometimes the returned inner map has the values
> assigned to other keys!
>
> Changing the type to MapState[String, String] seem to solve the problem.
>
> The code is a little too deep to dig into. But my guess is that when the
> state is being updated and queried at the same time, there can be a race
> condition and cause data corruption. Please let me know if you have a
> better
> idea what could be happening. Much appreciated!
>
> Best,
> Burgess Chen
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: [DISCUSS] Temporarily remove support for job rescaling via CLI action "modify"

2019-04-29 Thread Gary Yao
Since there were no objections so far, I will proceed with removing the
code [1].

[1] https://issues.apache.org/jira/browse/FLINK-12312

On Wed, Apr 24, 2019 at 1:38 PM Gary Yao  wrote:

> The idea is to also remove the rescaling code in the JobMaster. This will
> make
> it easier to remove the ExecutionGraph reference from the JobMaster which
> is
> needed for the scheduling rework [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-12231
>
> On Wed, Apr 24, 2019 at 12:14 PM Shuai Xu  wrote:
>
>> Will we only remove command support in client side or the code in job
>> master will also be removed?
>>
>> Till Rohrmann  于2019年4月24日周三 下午4:12写道:
>>
>> > +1 for temporarily removing support for the modify command.
>> >
>> > Eventually, we have to add it again in order to support auto scaling.
>> The
>> > next time we add it, we should address the known limitations.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Wed, Apr 24, 2019 at 9:06 AM Paul Lam  wrote:
>> >
>> > > Hi Gary,
>> > >
>> > > + 1 to remove it for now. Actually some users are not aware of that
>> it’s
>> > > still experimental, and ask quite a lot about the problem it causes.
>> > >
>> > > Best,
>> > > Paul Lam
>> > >
>> > > 在 2019年4月24日,14:49,Stephan Ewen  写道:
>> > >
>> > > Sounds reasonable to me. If it is a broken feature, then there is not
>> > much
>> > > value in it.
>> > >
>> > > On Tue, Apr 23, 2019 at 7:50 PM Gary Yao  wrote:
>> > >
>> > > Hi all,
>> > >
>> > > As the subject states, I am proposing to temporarily remove support
>> for
>> > > changing the parallelism of a job via the following syntax [1]:
>> > >
>> > >./bin/flink modify [job-id] -p [new-parallelism]
>> > >
>> > > This is an experimental feature that we introduced with the first
>> rollout
>> > > of
>> > > FLIP-6 (Flink 1.5). However, this feature comes with a few caveats:
>> > >
>> > >* Rescaling does not work with HA enabled [2]
>> > >* New parallelism is not persisted, i.e., after a JobManager
>> restart,
>> > > the job
>> > >  will be recovered with the initial parallelism
>> > >
>> > > Due to the above-mentioned issues, I believe that currently nobody
>> uses
>> > > "modify -p" to rescale their jobs in production. Moreover, the
>> rescaling
>> > > feature stands in the way of our current efforts to rework Flink's
>> > > scheduling
>> > > [3]. I therefore propose to remove the rescaling code for the time
>> being.
>> > > Note
>> > > that it will still be possible to change the parallelism by taking a
>> > > savepoint
>> > > and restoring the job with a different parallelism [4].
>> > >
>> > > Any comments and suggestions will be highly appreciated.
>> > >
>> > > Best,
>> > > Gary
>> > >
>> > > [1]
>> > >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html
>> > > [2] https://issues.apache.org/jira/browse/FLINK-8902
>> > > [3] https://issues.apache.org/jira/browse/FLINK-10429
>> > > [4]
>> > >
>> > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html#what-happens-when-i-change-the-parallelism-of-my-program-when-restoring
>> > >
>> > >
>> > >
>> >
>>
>


Re: Flink CLI

2019-04-26 Thread Gary Yao
Hi Steve,

(1)

The CLI action you are looking for is called "modify" [1]. However, we
want
to temporarily disable this feature beginning from Flink 1.9 due to some
caveats with it [2]. If you have objections, it would be appreciated if
you
could comment on the respective thread on the user/dev mailing list.

(2)

There is currently no option to have the CLI output JSON. However, as
others
have pointed out, you can use the REST API to invoke actions on the
cluster,
such as drawing savepoints [3]. This is also what the CLI ultimately
does
[4].

Best,
Gary

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Temporarily-remove-support-for-job-rescaling-via-CLI-action-quot-modify-quot-td27447.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#jobs-jobid-savepoints
[4]
https://github.com/apache/flink/blob/767fe152cb69a204261a0770412c8b28d037614d/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L415-L424

On Wed, Apr 24, 2019 at 5:06 PM Steven Nelson 
wrote:

> Hello!
>
> I am working on automating our deployments to our Flink cluster. I had a
> couple questions about the flink cli.
>
> 1) I thought there was an "update" command that would internally manage
> the cancel with savepoint, upload new jar, restart from savepoint process.
>
> 2) Is there a way to get the Flink cli to output it's result in a json
> format? Right now I would need to parse the results of the "flink list"
> command to get the job id, cancel the job with savepoint, parse the results
> of that to get the savepoint filename, then restore using that. Parsing the
> output seems brittle to me.
>
> Thought?
> -Steve
>
>


Re: [DISCUSS] Temporarily remove support for job rescaling via CLI action "modify"

2019-04-24 Thread Gary Yao
The idea is to also remove the rescaling code in the JobMaster. This will
make
it easier to remove the ExecutionGraph reference from the JobMaster which is
needed for the scheduling rework [1].

[1] https://issues.apache.org/jira/browse/FLINK-12231

On Wed, Apr 24, 2019 at 12:14 PM Shuai Xu  wrote:

> Will we only remove command support in client side or the code in job
> master will also be removed?
>
> Till Rohrmann  于2019年4月24日周三 下午4:12写道:
>
> > +1 for temporarily removing support for the modify command.
> >
> > Eventually, we have to add it again in order to support auto scaling. The
> > next time we add it, we should address the known limitations.
> >
> > Cheers,
> > Till
> >
> > On Wed, Apr 24, 2019 at 9:06 AM Paul Lam  wrote:
> >
> > > Hi Gary,
> > >
> > > + 1 to remove it for now. Actually some users are not aware of that
> it’s
> > > still experimental, and ask quite a lot about the problem it causes.
> > >
> > > Best,
> > > Paul Lam
> > >
> > > 在 2019年4月24日,14:49,Stephan Ewen  写道:
> > >
> > > Sounds reasonable to me. If it is a broken feature, then there is not
> > much
> > > value in it.
> > >
> > > On Tue, Apr 23, 2019 at 7:50 PM Gary Yao  wrote:
> > >
> > > Hi all,
> > >
> > > As the subject states, I am proposing to temporarily remove support for
> > > changing the parallelism of a job via the following syntax [1]:
> > >
> > >./bin/flink modify [job-id] -p [new-parallelism]
> > >
> > > This is an experimental feature that we introduced with the first
> rollout
> > > of
> > > FLIP-6 (Flink 1.5). However, this feature comes with a few caveats:
> > >
> > >* Rescaling does not work with HA enabled [2]
> > >* New parallelism is not persisted, i.e., after a JobManager
> restart,
> > > the job
> > >  will be recovered with the initial parallelism
> > >
> > > Due to the above-mentioned issues, I believe that currently nobody uses
> > > "modify -p" to rescale their jobs in production. Moreover, the
> rescaling
> > > feature stands in the way of our current efforts to rework Flink's
> > > scheduling
> > > [3]. I therefore propose to remove the rescaling code for the time
> being.
> > > Note
> > > that it will still be possible to change the parallelism by taking a
> > > savepoint
> > > and restoring the job with a different parallelism [4].
> > >
> > > Any comments and suggestions will be highly appreciated.
> > >
> > > Best,
> > > Gary
> > >
> > > [1]
> > >
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html
> > > [2] https://issues.apache.org/jira/browse/FLINK-8902
> > > [3] https://issues.apache.org/jira/browse/FLINK-10429
> > > [4]
> > >
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html#what-happens-when-i-change-the-parallelism-of-my-program-when-restoring
> > >
> > >
> > >
> >
>


[DISCUSS] Temporarily remove support for job rescaling via CLI action "modify"

2019-04-23 Thread Gary Yao
Hi all,

As the subject states, I am proposing to temporarily remove support for
changing the parallelism of a job via the following syntax [1]:

./bin/flink modify [job-id] -p [new-parallelism]

This is an experimental feature that we introduced with the first rollout of
FLIP-6 (Flink 1.5). However, this feature comes with a few caveats:

* Rescaling does not work with HA enabled [2]
* New parallelism is not persisted, i.e., after a JobManager restart,
the job
  will be recovered with the initial parallelism

Due to the above-mentioned issues, I believe that currently nobody uses
"modify -p" to rescale their jobs in production. Moreover, the rescaling
feature stands in the way of our current efforts to rework Flink's
scheduling
[3]. I therefore propose to remove the rescaling code for the time being.
Note
that it will still be possible to change the parallelism by taking a
savepoint
and restoring the job with a different parallelism [4].

Any comments and suggestions will be highly appreciated.

Best,
Gary

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html
[2] https://issues.apache.org/jira/browse/FLINK-8902
[3] https://issues.apache.org/jira/browse/FLINK-10429
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html#what-happens-when-i-change-the-parallelism-of-my-program-when-restoring


Re: inconsistent module descriptor for hadoop uber jar (Flink 1.8)

2019-04-16 Thread Gary Yao
Hi,

Can you describe how to reproduce this?

Best,
Gary

On Mon, Apr 15, 2019 at 9:26 PM Hao Sun  wrote:

> Hi, I can not find the root cause of this, I think hadoop version is mixed
> up between libs somehow.
>
> --- ERROR ---
> java.text.ParseException: inconsistent module descriptor file found in '
> https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop2-uber/2.8.3-1.8.0/flink-shaded-hadoop2-uber-2.8.3-1.8.0.pom':
> bad revision: expected='2.8.3-1.8.0' found='2.4.1-1.8.0';
>
> Is this a bug?
>
> Hao Sun
>


Re: Where does the logs in Flink GUI's Exception tab come from?

2019-03-15 Thread Gary Yao
Hi Averell,

I think I have answered your question previously [1]. The bottom line is
that
the error is logged on INFO level in the ExecutionGraph [2]. However, your
effective log level (of the root logger) is WARN. The log levels are ordered
as follows [3]:

TRACE < DEBUG < INFO <  WARN < ERROR

It follows that, in your case, log requests to the root logger of level WARN
and ERROR only will appear in the log file – all other levels will be
discarded. It should be enough to set the root log level to INFO, or set the
log level for the org.apache.flink.runtime.executiongraph.ExecutionGraph
logger explicitly to INFO (or lower).

Best,
Gary

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-resource-available-error-while-testing-HA-tp25681p25835.html
[2]
https://github.com/apache/flink/blob/81acd0a490f3ac40cbb2736189796138ac109dd0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L1363
[3] https://logback.qos.ch/manual/architecture.html

On Fri, Mar 15, 2019 at 4:21 AM Averell  wrote:

> Hi everyone,
>
> I am running Flink in EMR YARN cluster, and when the job failed and
> restarted, I could see some logs in the Exception tab of Flink GUI.
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Screen_Shot_2019-03-15_at_12.png>
>
>
> I could not find this piece of logs on my cluster's hard-disk - not in TM
> or
> JM logs.
>
> Where can I find this?
>
> Thanks.
>
> Here below is my logback.xml. I'm not sure it has anything to do with my
> question.
>
> 
> 
> ${log.file}
> false
> 
> %d{-MM-dd HH:mm:ss.SSS} [%thread] %-5level
> %logger{60} %X{sourceThread} - %msg%n
> 
> 
>
> 
> 
> 
>  level="INFO"
> additivity="false">
> 
> 
>  level="INFO" additivity="false">
> 
> 
> 
> name="org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline"
> level="ERROR">
> 
> 
> 
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Re: Re: Flink 1.7.2: Task Manager not able to connect to Job Manager

2019-03-15 Thread Gary Yao
I forgot to add line numbers to the first link in my previous email:


https://github.com/apache/flink/blob/c6878aca6c5aeee46581b4d6744b31049db9de95/flink-dist/src/main/flink-bin/bin/jobmanager.sh#L21-L25

On Fri, Mar 15, 2019 at 8:08 AM Gary Yao  wrote:

> Hi Harshith,
>
> In the jobmanager.sh script, the 2nd argument is assigned to the HOST
> variable
> [1]. How are you invoking jobmanager.sh? Prior to 1.5, the script expected
> an
> execution mode (local or cluster) but this is no longer the case [2].
>
> Best,
> Gary
>
> [1]
> https://github.com/apache/flink/blob/c6878aca6c5aeee46581b4d6744b31049db9de95/flink-dist/src/main/flink-bin/bin/jobmanager.sh
> [2]
> https://github.com/apache/flink/commit/d61664ca64bcb82c4e8ddf03a2ed38fe8edafa98
>
> On Fri, Mar 15, 2019 at 3:36 AM Kumar Bolar, Harshith 
> wrote:
>
>> Hi Gary,
>>
>>
>>
>> An update. I noticed the line “–host cluster” in the program arguments
>> section of the job manager logs. So, I commented the following section in
>> jobmanager.sh, the task manager is now able to connect to job manager
>> without issues.
>>
>>
>>
>>   *if [ ! -z $HOST ]; then*
>>
>> *args+=("--host")*
>>
>> *args+=("${HOST}")*
>>
>> *fi*
>>
>>
>>
>>
>>
>> Task manager logs after commenting those lines:
>>
>>
>>
>>
>> * 2019-03-14 22:31:02,863 INFO
>> org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
>> RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at
>> akka://flink/user/taskmanager_0 .*
>>
>> *2019-03-14 22:31:02,875 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.*
>>
>> *2019-03-14 22:31:02,876 INFO
>> org.apache.flink.runtime.taskexecutor.JobLeaderService- Start job
>> leader service.*
>>
>> *2019-03-14 22:31:02,877 INFO
>> org.apache.flink.runtime.filecache.FileCache  - User file
>> cache uses directory
>> /tmp/flink-dist-cache-12d5905f-d694-46f6-9359-3a636188b008*
>>
>> *2019-03-14 22:31:02,884 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting
>> to ResourceManager
>> akka.tcp://fl...@flink0-1.flink1.us-east-1.high.ue1.non.aws.cloud.arity.com:28945/user/resourcemanager(8583b335fd08a30a89585b7af07e4213)
>> <http://fl...@flink0-1.flink1.us-east-1.high.ue1.non.aws.cloud.arity.com:28945/user/resourcemanager(8583b335fd08a30a89585b7af07e4213)>.*
>>
>> *2019-03-14 22:31:03,109 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Resolved
>> ResourceManager address, beginning registration*
>>
>> *2019-03-14 22:31:03,110 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor-
>> Registration at ResourceManager attempt 1 (timeout=100ms)*
>>
>> *2019-03-14 22:31:03,228 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor-
>> Registration at ResourceManager attempt 2 (timeout=200ms)*
>>
>> *2019-03-14 22:31:03,266 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Successful
>> registration at resource manager
>> akka.tcp://fl...@flink0-1.flink1.us-east-1.abc.com:28945/user/resourcemanager
>> <http://fl...@flink0-1.flink1.us-east-1.abc.com:28945/user/resourcemanager>
>> under registration id 170ee6a00f80ee02ead0e88710093d77.*
>>
>>
>>
>>
>>
>> Thanks,
>>
>> Harshith
>>
>>
>>
>> *From: *Harshith Kumar Bolar 
>> *Date: *Friday, 15 March 2019 at 7:38 AM
>> *To: *Gary Yao 
>> *Cc: *user 
>> *Subject: *Re: [External] Re: Re: Flink 1.7.2: Task Manager not able to
>> connect to Job Manager
>>
>>
>>
>> Hi Gary,
>>
>>
>>
>> Here are the full job manager and task manager logs. In the job manager
>> logs, I see it says “*starting StandaloneSessionClusterEntrypoint”,* whereas
>> in Flink 1.4.2, it used to say “*starting JobManager”*. Is this correct?
>>
>>
>>
>> Job manager logs: https://paste.ubuntu.com/p/DCVzsQdpHq/ 
>> (https://paste(.)ubuntu(.)com/p/DCVzsQdpHq
>> /)
>>
>> Task Manager logs: https://paste.ubuntu.com/p/wbvYFZxdT8/ (
>> https://paste(.)ubuntu(.)com/p/wbvYFZxdT8/)
>>
>>
>>
>> Thanks,
>>
>> Harshith
>>
>>
>>
>> *From: *Gary Yao 
>> *Date: *Thursday, 14 March 2019 at 10:11 PM
>> *To: *Ha

Re: Re: Re: Flink 1.7.2: Task Manager not able to connect to Job Manager

2019-03-15 Thread Gary Yao
Hi Harshith,

In the jobmanager.sh script, the 2nd argument is assigned to the HOST
variable
[1]. How are you invoking jobmanager.sh? Prior to 1.5, the script expected
an
execution mode (local or cluster) but this is no longer the case [2].

Best,
Gary

[1]
https://github.com/apache/flink/blob/c6878aca6c5aeee46581b4d6744b31049db9de95/flink-dist/src/main/flink-bin/bin/jobmanager.sh
[2]
https://github.com/apache/flink/commit/d61664ca64bcb82c4e8ddf03a2ed38fe8edafa98

On Fri, Mar 15, 2019 at 3:36 AM Kumar Bolar, Harshith 
wrote:

> Hi Gary,
>
>
>
> An update. I noticed the line “–host cluster” in the program arguments
> section of the job manager logs. So, I commented the following section in
> jobmanager.sh, the task manager is now able to connect to job manager
> without issues.
>
>
>
>   *if [ ! -z $HOST ]; then*
>
> *args+=("--host")*
>
> *args+=("${HOST}")*
>
> *fi*
>
>
>
>
>
> Task manager logs after commenting those lines:
>
>
>
>
> * 2019-03-14 22:31:02,863 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
> RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at
> akka://flink/user/taskmanager_0 .*
>
> *2019-03-14 22:31:02,875 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.*
>
> *2019-03-14 22:31:02,876 INFO
> org.apache.flink.runtime.taskexecutor.JobLeaderService- Start job
> leader service.*
>
> *2019-03-14 22:31:02,877 INFO
> org.apache.flink.runtime.filecache.FileCache  - User file
> cache uses directory
> /tmp/flink-dist-cache-12d5905f-d694-46f6-9359-3a636188b008*
>
> *2019-03-14 22:31:02,884 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting
> to ResourceManager
> akka.tcp://fl...@flink0-1.flink1.us-east-1.high.ue1.non.aws.cloud.arity.com:28945/user/resourcemanager(8583b335fd08a30a89585b7af07e4213)
> <http://fl...@flink0-1.flink1.us-east-1.high.ue1.non.aws.cloud.arity.com:28945/user/resourcemanager(8583b335fd08a30a89585b7af07e4213)>.*
>
> *2019-03-14 22:31:03,109 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Resolved
> ResourceManager address, beginning registration*
>
> *2019-03-14 22:31:03,110 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Registration at ResourceManager attempt 1 (timeout=100ms)*
>
> *2019-03-14 22:31:03,228 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Registration at ResourceManager attempt 2 (timeout=200ms)*
>
> *2019-03-14 22:31:03,266 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Successful
> registration at resource manager
> akka.tcp://fl...@flink0-1.flink1.us-east-1.abc.com:28945/user/resourcemanager
> <http://fl...@flink0-1.flink1.us-east-1.abc.com:28945/user/resourcemanager>
> under registration id 170ee6a00f80ee02ead0e88710093d77.*
>
>
>
>
>
> Thanks,
>
> Harshith
>
>
>
> *From: *Harshith Kumar Bolar 
> *Date: *Friday, 15 March 2019 at 7:38 AM
> *To: *Gary Yao 
> *Cc: *user 
> *Subject: *Re: [External] Re: Re: Flink 1.7.2: Task Manager not able to
> connect to Job Manager
>
>
>
> Hi Gary,
>
>
>
> Here are the full job manager and task manager logs. In the job manager
> logs, I see it says “*starting StandaloneSessionClusterEntrypoint”,* whereas
> in Flink 1.4.2, it used to say “*starting JobManager”*. Is this correct?
>
>
>
> Job manager logs: https://paste.ubuntu.com/p/DCVzsQdpHq/ 
> (https://paste(.)ubuntu(.)com/p/DCVzsQdpHq
> /)
>
> Task Manager logs: https://paste.ubuntu.com/p/wbvYFZxdT8/ (
> https://paste(.)ubuntu(.)com/p/wbvYFZxdT8/)
>
>
>
> Thanks,
>
> Harshith
>
>
>
> *From: *Gary Yao 
> *Date: *Thursday, 14 March 2019 at 10:11 PM
> *To: *Harshith Kumar Bolar 
> *Cc: *user 
> *Subject: *[External] Re: Re: Flink 1.7.2: Task Manager not able to
> connect to Job Manager
>
>
>
> Hi Harshith,
>
> The truncated log is not enough. Can you share the complete logs? If that's
> not possible, I'd like to see the beginning of the log files where the
> cluster
> configuration is logged.
>
> The TaskManager tries to connect to the leader that is advertised in
> ZooKeeper. In your case the "cluster" hostname is advertised which hints a
> problem in your Flink configuration.
>
> Best,
> Gary
>
>
>
> On Thu, Mar 14, 2019 at 4:54 PM Kumar Bolar, Harshith 
> wrote:
>
> Hi Gary,
>
>
>
> I’ve attached the relevant portions of the JM and TM logs.
>
>
>
>

Re: Re: Flink 1.7.2: Task Manager not able to connect to Job Manager

2019-03-14 Thread Gary Yao
concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2019-03-14 11:47:35,904 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Stopping
> TaskExecutor akka.tcp://
> fl...@flink1-1.flink1.us-east-1.com:24623/user/taskmanager_0.
> 2019-03-14 11:47:35,904 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> 2019-03-14 11:47:35,904 INFO
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
> Shutting down TaskExecutorLocalStateStoresManager.
> 2019-03-14 11:47:35,908 INFO
> org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager
> removed spill file directory
> /tmp/flink-io-a7bc246d-bae4-489f-9c9c-f6a25d3c4b8f
> 2019-03-14 11:47:35,908 INFO
> org.apache.flink.runtime.io.network.NetworkEnvironment- Shutting
> down the network environment and its components.
> 2019-03-14 11:47:35,914 INFO
> org.apache.flink.runtime.io.network.netty.NettyClient - Successful
> shutdown (took 5 ms).
> 2019-03-14 11:47:35,917 INFO
> org.apache.flink.runtime.io.network.netty.NettyServer - Successful
> shutdown (took 2 ms).
> 2019-03-14 11:47:35,925 INFO
> org.apache.flink.runtime.taskexecutor.JobLeaderService- Stop job
> leader service.
> 2019-03-14 11:47:35,931 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Stopped
> TaskExecutor akka.tcp://
> fl...@flink1-1.flink1.us-east-1.com:24623/user/taskmanager_0.
> 2019-03-14 11:47:35,931 INFO
> org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting
> down BLOB cache
> 2019-03-14 11:47:35,933 INFO
> org.apache.flink.runtime.blob.TransientBlobCache  - Shutting
> down BLOB cache
> 2019-03-14 11:47:35,943 INFO
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
> - backgroundOperationsLoop exiting
> 2019-03-14 11:47:35,950 INFO
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  -
> Session: 0x26977a24c4e0018 closed
> 2019-03-14 11:47:35,950 INFO
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
> EventThread shut down for session: 0x26977a24c4e0018
> 2019-03-14 11:47:35,950 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Stopping
> Akka RPC service.
> 2019-03-14 11:47:35,952 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting
> down remote daemon.
> 2019-03-14 11:47:35,952 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote
> daemon shut down; proceeding with flushing remote transports.
> 2019-03-14 11:47:35,959 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting
> down remote daemon.
> 2019-03-14 11:47:35,966 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote
> daemon shut down; proceeding with flushing remote transports.
> 2019-03-14 11:47:35,983 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting
> shut down.
> 2019-03-14 11:47:35,984 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting
> shut down.
> 2019-03-14 11:47:35,992 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Stopped
> Akka RPC service.
>
>
>
>
>
> *From: *Gary Yao 
> *Date: *Thursday, 14 March 2019 at 9:06 PM
> *To: *Harshith Kumar Bolar 
> *Cc: *user 
> *Subject: *[External] Re: Flink 1.7.2: Task Manager not able to connect
> to Job Manager
>
>
>
> Hi Harshith,
>
>
>
> Can you share JM and TM logs?
>
>
>
> Best,
>
> Gary
>
>
>
> On Thu, Mar 14, 2019 at 3:42 PM Kumar Bolar, Harshith 
> wrote:
>
> Hi all,
>
>
>
> I'm trying to upgrade our Flink cluster from 1.4.2 to 1.7.2
>
>
>
> When I bring up the cluster, the task managers refuse to connect to the
> job managers with the following error.
>
>
>
> 2019-03-14 10:34:41,551 WARN
> akka.remote.ReliableDeliverySupervisor
>
> - Association with remote system [akka.tcp://flink@cluster:22671]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@cluster:22671]] Caused by: [cluster: Name or
> service not known]
>
>
>
> Now, this works correctly if I add the following line into
> the /etc/hosts file.
>
>
>
> x.x.x.x job-manager-address.com
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__job-2Dmanager-2Daddress.com=DwMFaQ=gtIjdLs6LnStUpy9cTOW9w=61bFb6zUNKZxlAQDRo_jKA=04EWFpDL8G7AOCUH79K-QVwPa3NSJj7u4Qanpbrx0tg=KDu-Fxq2rWtLq1EmNp0DOuK0yWC6GyHwvhpbyQ8hRQg=>
> cluster
>
>
>
> Why is Flink 1.7.2 connecting to JM using cluster in the address? Flink
> 1.4.2 used to have the job manager's address instead of the word cluster.
>
>
>
> Thanks,
>
> Harshith
>
>
>
>


Re: Flink 1.7.2: Task Manager not able to connect to Job Manager

2019-03-14 Thread Gary Yao
Hi Harshith,

Can you share JM and TM logs?

Best,
Gary

On Thu, Mar 14, 2019 at 3:42 PM Kumar Bolar, Harshith 
wrote:

> Hi all,
>
>
>
> I'm trying to upgrade our Flink cluster from 1.4.2 to 1.7.2
>
>
>
> When I bring up the cluster, the task managers refuse to connect to the
> job managers with the following error.
>
>
>
> 2019-03-14 10:34:41,551 WARN
> akka.remote.ReliableDeliverySupervisor
>
> - Association with remote system [akka.tcp://flink@cluster:22671]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@cluster:22671]] Caused by: [cluster: Name or
> service not known]
>
>
>
> Now, this works correctly if I add the following line into
> the /etc/hosts file.
>
>
>
> x.x.x.x job-manager-address.com cluster
>
>
>
> Why is Flink 1.7.2 connecting to JM using cluster in the address? Flink
> 1.4.2 used to have the job manager's address instead of the word cluster.
>
>
>
> Thanks,
>
> Harshith
>
>
>


Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Gary Yao
The RC artifacts are only deployed to the Maven Central Repository when the
RC
is promoted to a release. As written in the 1.8.0 RC1 voting email [1], you
can find the maven artifacts, and the Flink binaries here:

-
https://repository.apache.org/content/repositories/orgapacheflink-1210/
- https://dist.apache.org/repos/dist/dev/flink/flink-1.8.0-rc1/

Alternatively, you can apply the patch yourself, and build Flink 1.7 from
sources [2]. On my machine this takes around 10 minutes if tests are
skipped.

Best,
Gary

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html#build-flink

On Tue, Mar 12, 2019 at 4:01 PM Vishal Santoshi 
wrote:

> Do you have a mvn repository ( at mvn central )  set up for 1,8 release
> candidate. We could test it for you.
>
> Without 1.8and this exit code we are essentially held up.
>
> On Tue, Mar 12, 2019 at 10:56 AM Gary Yao  wrote:
>
>> Nobody can tell with 100% certainty. We want to give the RC some exposure
>> first, and there is also a release process that is prescribed by the ASF
>> [1].
>> You can look at past releases to get a feeling for how long the release
>> process lasts [2].
>>
>> [1] http://www.apache.org/legal/release-policy.html#release-approval
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/template/NamlServlet.jtp?macro=search_page=1=%5BVOTE%5D+Release=0
>>
>>
>> On Tue, Mar 12, 2019 at 3:38 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> And when is the 1.8.0 release expected ?
>>>
>>> On Tue, Mar 12, 2019 at 10:32 AM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> :) That makes so much more sense. Is  k8s native flink a part of this
>>>> release ?
>>>>
>>>> On Tue, Mar 12, 2019 at 10:27 AM Gary Yao  wrote:
>>>>
>>>>> Hi Vishal,
>>>>>
>>>>> This issue was fixed recently [1], and the patch will be released with
>>>>> 1.8. If
>>>>> the Flink job gets cancelled, the JVM should exit with code 0. There
>>>>> is a
>>>>> release candidate [2], which you can test.
>>>>>
>>>>> Best,
>>>>> Gary
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-10743
>>>>> [2]
>>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html
>>>>>
>>>>> On Tue, Mar 12, 2019 at 3:21 PM Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> Thanks Vijay,
>>>>>>
>>>>>> This is the larger issue.  The cancellation routine is itself broken.
>>>>>>
>>>>>> On cancellation flink does remove the checkpoint counter
>>>>>>
>>>>>> *2019-03-12 14:12:13,143
>>>>>> INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
>>>>>> Removing /checkpoint-counter/ from
>>>>>> ZooKeeper *
>>>>>>
>>>>>> but exist with a non zero code
>>>>>>
>>>>>> *2019-03-12 14:12:13,477
>>>>>> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>>>>> Terminating cluster entrypoint process StandaloneJobClusterEntryPoint 
>>>>>> with
>>>>>> exit code 1444.*
>>>>>>
>>>>>>
>>>>>> That I think is an issue. A cancelled job is a complete job and thus
>>>>>> the exit code should be 0 for k8s to mark it complete.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Mar 12, 2019 at 10:18 AM Vijay Bhaskar <
>>>>>> bhaskar.eba...@gmail.com> wrote:
>>>>>>
>>>>>>> Yes Vishal. Thats correct.
>>>>>>>
>>>>>>> Regards
>>>>>>> Bhaskar
>>>>>>>
>>>>>>> On Tue, Mar 12, 2019 at 7:14 PM Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> This r

Re: local disk cleanup after crash

2019-03-12 Thread Gary Yao
Hi,

If no other TaskManager (TM) is running, you can delete everything. If
multiple TMs share the same host, as far as I know, you will have to parse
TM
logs to know what directories you can delete [1]. As for local recovery,
tasks
that were running on a crashed TM are lost. From the documentation [2]:

If a task manager is lost, the local state from all its task is lost.

Therefore, assuming that only one TM is running on each host, you can delete
everything.

Best,
Gary

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-are-blobstore-files-and-why-do-they-keep-filling-up-tmp-directory-td26323.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery

On Thu, Mar 7, 2019 at 10:45 PM Derek VerLee  wrote:

> I think that effort is put in to have task managers clean up their
> folders, however I have noticed that in some cases local folders are not
> cleaned up and can build up, eventually causing problems due to a full
> disk.  As far as I know this only happens with crashes and other
> out-of-happy-path scenarios.
>
> I am thinking of writing a script to clean up local folders that runs
> before task-manager starts between restarts in the case of a crash.
>
> Assuming local recovery is not configured, what should I delete and what
> should I leave around?
>
> What should I keep if local recovery is configured?
>
>
> Under the "taskmanager.tmp.dirs" I see:
>
> blobStore-*
> flink-dist-cache-*
> flink-io-*
> localState/*
> rocksdb-lib-*
>
>
> Thanks
>


Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Gary Yao
Nobody can tell with 100% certainty. We want to give the RC some exposure
first, and there is also a release process that is prescribed by the ASF
[1].
You can look at past releases to get a feeling for how long the release
process lasts [2].

[1] http://www.apache.org/legal/release-policy.html#release-approval
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/template/NamlServlet.jtp?macro=search_page=1=%5BVOTE%5D+Release=0


On Tue, Mar 12, 2019 at 3:38 PM Vishal Santoshi 
wrote:

> And when is the 1.8.0 release expected ?
>
> On Tue, Mar 12, 2019 at 10:32 AM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> :) That makes so much more sense. Is  k8s native flink a part of this
>> release ?
>>
>> On Tue, Mar 12, 2019 at 10:27 AM Gary Yao  wrote:
>>
>>> Hi Vishal,
>>>
>>> This issue was fixed recently [1], and the patch will be released with
>>> 1.8. If
>>> the Flink job gets cancelled, the JVM should exit with code 0. There is a
>>> release candidate [2], which you can test.
>>>
>>> Best,
>>> Gary
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-10743
>>> [2]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html
>>>
>>> On Tue, Mar 12, 2019 at 3:21 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> Thanks Vijay,
>>>>
>>>> This is the larger issue.  The cancellation routine is itself broken.
>>>>
>>>> On cancellation flink does remove the checkpoint counter
>>>>
>>>> *2019-03-12 14:12:13,143
>>>> INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
>>>> Removing /checkpoint-counter/ from
>>>> ZooKeeper *
>>>>
>>>> but exist with a non zero code
>>>>
>>>> *2019-03-12 14:12:13,477
>>>> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>>> Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with
>>>> exit code 1444.*
>>>>
>>>>
>>>> That I think is an issue. A cancelled job is a complete job and thus
>>>> the exit code should be 0 for k8s to mark it complete.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Mar 12, 2019 at 10:18 AM Vijay Bhaskar <
>>>> bhaskar.eba...@gmail.com> wrote:
>>>>
>>>>> Yes Vishal. Thats correct.
>>>>>
>>>>> Regards
>>>>> Bhaskar
>>>>>
>>>>> On Tue, Mar 12, 2019 at 7:14 PM Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> This really not cool but here you go. This seems to work. Agreed that
>>>>>> this cannot be this painful. The cancel does not exit with an exit code 
>>>>>> pf
>>>>>> 0 and thus the job has to manually delete. Vijay does this align with 
>>>>>> what
>>>>>> you have had to do ?
>>>>>>
>>>>>>
>>>>>>- Take a save point . This returns a request id
>>>>>>
>>>>>>curl  --header "Content-Type: application/json" --request POST --data 
>>>>>> '{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":false}'
>>>>>> 
>>>>>> https://*/jobs//savepoints
>>>>>>
>>>>>>
>>>>>>
>>>>>>- Make sure the save point succeeded
>>>>>>
>>>>>>curl  --request GET   
>>>>>> https:///jobs//savepoints/2c053ce3bea31276aa25e63784629687
>>>>>>
>>>>>>
>>>>>>
>>>>>>- cancel the job
>>>>>>
>>>>>>curl  --request PATCH 
>>>>>> https://***/jobs/?mode=cancel
>>>>>>
>>>>>>
>>>>>>
>>>>>>- Delete the job and deployment
>>>>>>
>>>>>>kubectl delete -f 
>>>>>> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml

Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Gary Yao
Hi Vishal,

I'm afraid not but there are open pull requests for that. You can track the
progress here:

https://issues.apache.org/jira/browse/FLINK-9953

Best,
Gary

On Tue, Mar 12, 2019 at 3:32 PM Vishal Santoshi 
wrote:

> :) That makes so much more sense. Is  k8s native flink a part of this
> release ?
>
> On Tue, Mar 12, 2019 at 10:27 AM Gary Yao  wrote:
>
>> Hi Vishal,
>>
>> This issue was fixed recently [1], and the patch will be released with
>> 1.8. If
>> the Flink job gets cancelled, the JVM should exit with code 0. There is a
>> release candidate [2], which you can test.
>>
>> Best,
>> Gary
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-10743
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html
>>
>> On Tue, Mar 12, 2019 at 3:21 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Thanks Vijay,
>>>
>>> This is the larger issue.  The cancellation routine is itself broken.
>>>
>>> On cancellation flink does remove the checkpoint counter
>>>
>>> *2019-03-12 14:12:13,143
>>> INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
>>> Removing /checkpoint-counter/ from
>>> ZooKeeper *
>>>
>>> but exist with a non zero code
>>>
>>> *2019-03-12 14:12:13,477
>>> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>> Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with
>>> exit code 1444.*
>>>
>>>
>>> That I think is an issue. A cancelled job is a complete job and thus the
>>> exit code should be 0 for k8s to mark it complete.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Mar 12, 2019 at 10:18 AM Vijay Bhaskar 
>>> wrote:
>>>
>>>> Yes Vishal. Thats correct.
>>>>
>>>> Regards
>>>> Bhaskar
>>>>
>>>> On Tue, Mar 12, 2019 at 7:14 PM Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> This really not cool but here you go. This seems to work. Agreed that
>>>>> this cannot be this painful. The cancel does not exit with an exit code pf
>>>>> 0 and thus the job has to manually delete. Vijay does this align with what
>>>>> you have had to do ?
>>>>>
>>>>>
>>>>>- Take a save point . This returns a request id
>>>>>
>>>>>curl  --header "Content-Type: application/json" --request POST --data 
>>>>> '{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":false}'
>>>>> https://*/jobs//savepoints
>>>>>
>>>>>
>>>>>
>>>>>- Make sure the save point succeeded
>>>>>
>>>>>curl  --request GET   
>>>>> https:///jobs//savepoints/2c053ce3bea31276aa25e63784629687
>>>>>
>>>>>
>>>>>
>>>>>- cancel the job
>>>>>
>>>>>curl  --request PATCH 
>>>>> https://***/jobs/?mode=cancel
>>>>>
>>>>>
>>>>>
>>>>>- Delete the job and deployment
>>>>>
>>>>>kubectl delete -f 
>>>>> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>>>>>
>>>>>kubectl delete -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>>>>>
>>>>>
>>>>>
>>>>>- Edit the job-cluster-job-deployment.yaml. Add/Edit
>>>>>
>>>>>args: ["job-cluster",
>>>>>
>>>>>   "--fromSavepoint",
>>>>>
>>>>>   
>>>>> "hdfs:///tmp/xyz14/savepoint-00-1d4f71345e22",
>>>>>   "--job-classname", .
>>>>>
>>>>>
>>>>>
>>>>>- Restart
>>>>>
>>>>>kubectl create -f 
>>>>> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>>>>>
>>&g

Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Gary Yao
Hi Vishal,

This issue was fixed recently [1], and the patch will be released with 1.8.
If
the Flink job gets cancelled, the JVM should exit with code 0. There is a
release candidate [2], which you can test.

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-10743
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html

On Tue, Mar 12, 2019 at 3:21 PM Vishal Santoshi 
wrote:

> Thanks Vijay,
>
> This is the larger issue.  The cancellation routine is itself broken.
>
> On cancellation flink does remove the checkpoint counter
>
> *2019-03-12 14:12:13,143
> INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
> Removing /checkpoint-counter/ from
> ZooKeeper *
>
> but exist with a non zero code
>
> *2019-03-12 14:12:13,477
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with
> exit code 1444.*
>
>
> That I think is an issue. A cancelled job is a complete job and thus the
> exit code should be 0 for k8s to mark it complete.
>
>
>
>
>
>
>
>
>
> On Tue, Mar 12, 2019 at 10:18 AM Vijay Bhaskar 
> wrote:
>
>> Yes Vishal. Thats correct.
>>
>> Regards
>> Bhaskar
>>
>> On Tue, Mar 12, 2019 at 7:14 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> This really not cool but here you go. This seems to work. Agreed that
>>> this cannot be this painful. The cancel does not exit with an exit code pf
>>> 0 and thus the job has to manually delete. Vijay does this align with what
>>> you have had to do ?
>>>
>>>
>>>- Take a save point . This returns a request id
>>>
>>>curl  --header "Content-Type: application/json" --request POST --data 
>>> '{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":false}'
>>> https://*/jobs//savepoints
>>>
>>>
>>>
>>>- Make sure the save point succeeded
>>>
>>>curl  --request GET   
>>> https:///jobs//savepoints/2c053ce3bea31276aa25e63784629687
>>>
>>>
>>>
>>>- cancel the job
>>>
>>>curl  --request PATCH 
>>> https://***/jobs/?mode=cancel
>>>
>>>
>>>
>>>- Delete the job and deployment
>>>
>>>kubectl delete -f 
>>> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>>>
>>>kubectl delete -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>>>
>>>
>>>
>>>- Edit the job-cluster-job-deployment.yaml. Add/Edit
>>>
>>>args: ["job-cluster",
>>>
>>>   "--fromSavepoint",
>>>
>>>   
>>> "hdfs:///tmp/xyz14/savepoint-00-1d4f71345e22",
>>>   "--job-classname", .
>>>
>>>
>>>
>>>- Restart
>>>
>>>kubectl create -f 
>>> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>>>
>>>kubectl create -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>>>
>>>
>>>
>>>- Make sure from the UI, that it restored from the specific save
>>>point.
>>>
>>>
>>> On Tue, Mar 12, 2019 at 7:26 AM Vijay Bhaskar 
>>> wrote:
>>>
 Yes Its supposed to work.  But unfortunately it was not working. Flink
 community needs to respond to this behavior.

 Regards
 Bhaskar

 On Tue, Mar 12, 2019 at 3:45 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Aah.
> Let me try this out and will get back to you.
> Though I would assume that save point with cancel is a single atomic
> step, rather then a save point *followed*  by a cancellation ( else
> why would that be an option ).
> Thanks again.
>
>
> On Tue, Mar 12, 2019 at 4:50 AM Vijay Bhaskar <
> bhaskar.eba...@gmail.com> wrote:
>
>> Hi Vishal,
>>
>> yarn-cancel doesn't mean to be for yarn cluster. It works for all
>> clusters. Its recommended command.
>>
>> Use the following command to issue save point.
>>  curl  --header "Content-Type: application/json" --request POST
>> --data '{"target-directory":"hdfs://*:8020/tmp/xyz1",
>> "cancel-job":false}'  \ https://
>> .ingress.***/jobs//savepoints
>>
>> Then issue yarn-cancel.
>> After that  follow the process to restore save point
>>
>> Regards
>> Bhaskar
>>
>> On Tue, Mar 12, 2019 at 2:11 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Hello Vijay,
>>>
>>>Thank you for the reply. This though is k8s
>>> deployment ( rather then yarn ) but may be they follow the same 
>>> lifecycle.
>>> I issue a* save point with cancel*  as documented here
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints,
>>> a straight up
>>>  curl  --header "Content-Type: application/json" 

Re: submit job failed on Yarn HA

2019-03-05 Thread Gary Yao
Hi Sen,

I took a look at your CLI logs again, and saw that it uses the "default"
Flink
namespace in ZooKeeper:

2019-02-28 11:18:05,255 INFO
org.apache.flink.runtime.util.ZooKeeperUtils  - Using
'/flink/default' as Zookeeper namespace.

However, since you are using YARN, the Flink namespace in ZooKeeper should
include the YARN applicationId. Normally, the CLI tries to resolve the
applicationId from a local "YARN properties" file [1], which is generated
after a successful submission of a session cluster (using Flink's bin/yarn-
session.sh) [2]. In your case that file does not exist – maybe because it
got
deleted, or the host from which you are submitting the job, is a different
one
from which the session cluster got started.

If you submit the job with -yid , or --yarnapplicationId
, the CLI should use the correct namespace in ZooKeeper.
Just submit the job normally without removing the ZooKeeper configuration
from
flink-conf.yaml, and without specifying host:port manually with the "-m"
option. Let me know if this works for you.

Best,
Gary

[1]
https://github.com/apache/flink/blob/e2579e39602ab7d3e906a185353dd413aca58317/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L236

[2]
https://github.com/apache/flink/blob/e2579e39602ab7d3e906a185353dd413aca58317/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L622-L625

[3]
https://github.com/apache/flink/blob/e2579e39602ab7d3e906a185353dd413aca58317/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L603-L606

On Wed, Mar 6, 2019 at 3:58 AM 孙森  wrote:

> Hi Gary:
>
>   Thanks very much! I have tried it as the way you said. It works.
> Hopes that the bug can be fixed as soon as possible.
> Best!
> Sen
>
> 在 2019年3月5日,下午3:15,Gary Yao  写道:
>
> Hi Sen,
>
> In that email I meant that you should disable the ZooKeeper configuration
> in
> the CLI because the CLI had troubles resolving the leader from ZooKeeper.
> What
> you should have done is:
>
> 1. Start the cluster normally with ZooKeeper enabled
> 2. Edit flink-conf.yaml to remove ZooKeeper config
> 3. Submit the job to your cluster with -m flag.
>
> Best,
> Gary
>
> On Tue, Mar 5, 2019 at 8:08 AM 孙森  wrote:
>
>> Hi Gary:
>>
>>   No zookeeper is because the reason that the job submit will
>> fail.
>> <屏幕快照 2019-03-05 下午3.07.21.png>
>>
>>
>> Best
>> Sen
>>
>> 在 2019年3月5日,下午3:02,Gary Yao  写道:
>>
>> Hi Sen,
>>
>> I don't see
>>
>> high-availability: zookeeper
>>
>> in your Flink configuration. However, this is mandatory for an HA setup.
>> By
>> default "none" is used, and the ZK configuration is ignored. The log also
>> hints that you are using StandaloneLeaderElectionService instead of the
>> ZooKeeper implementation (note that the leaderSessionID consists only of
>> 0s
>> [1][2]):
>>
>> 2019-03-05 11:23:53,883 INFO
>> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint-
>> http://hdp3:60179 was granted leadership with
>> leaderSessionID=----
>>
>> Did you accidentally delete the "high-availability" config from your
>> flink-
>> conf.yaml?
>>
>> You probably also want to increase the number of yarn.application-attempts
>> [3].
>>
>> Best,
>> Gary
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/dcd8c74b504046802cebf278b718e4753928a260/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java#L48
>> [2]
>> https://github.com/apache/flink/blob/dcd8c74b504046802cebf278b718e4753928a260/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java#L52-L57
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html#recovery-behavior-of-flink-on-yarn
>>
>> On Tue, Mar 5, 2019 at 7:41 AM 孙森  wrote:
>>
>>> Hi Gary:
>>>  I used FsStateBackend .
>>>
>>>
>>> The jm log is here:
>>>
>>>
>>> After restart , the log is :
>>>
>>>
>>>
>>>
>>> Best!
>>> Sen
>>>
>>>
>>> 在 2019年3月4日,下午10:50,Gary Yao  写道:
>>>
>>> Hi Sen,
>>>
>>> Are you using the default MemoryStateBackend [1]? As far as I know, it
>>> does
>>> not support JobManager failover. If you are already using FsStateBackend
>>> or
>>> RocksDBStateBackend, please send JM logs.
>>>
>>> Best,
&

Re: submit job failed on Yarn HA

2019-03-04 Thread Gary Yao
Hi Sen,

I don't see

high-availability: zookeeper

in your Flink configuration. However, this is mandatory for an HA setup. By
default "none" is used, and the ZK configuration is ignored. The log also
hints that you are using StandaloneLeaderElectionService instead of the
ZooKeeper implementation (note that the leaderSessionID consists only of 0s
[1][2]):

2019-03-05 11:23:53,883 INFO
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint-
http://hdp3:60179 was granted leadership with
leaderSessionID=----

Did you accidentally delete the "high-availability" config from your flink-
conf.yaml?

You probably also want to increase the number of yarn.application-attempts
[3].

Best,
Gary


[1]
https://github.com/apache/flink/blob/dcd8c74b504046802cebf278b718e4753928a260/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java#L48
[2]
https://github.com/apache/flink/blob/dcd8c74b504046802cebf278b718e4753928a260/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java#L52-L57
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html#recovery-behavior-of-flink-on-yarn

On Tue, Mar 5, 2019 at 7:41 AM 孙森  wrote:

> Hi Gary:
>  I used FsStateBackend .
>
>
> The jm log is here:
>
>
> After restart , the log is :
>
>
>
>
> Best!
> Sen
>
>
> 在 2019年3月4日,下午10:50,Gary Yao  写道:
>
> Hi Sen,
>
> Are you using the default MemoryStateBackend [1]? As far as I know, it does
> not support JobManager failover. If you are already using FsStateBackend or
> RocksDBStateBackend, please send JM logs.
>
> Best,
> Gary
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html#available-state-backends
>
>
> On Mon, Mar 4, 2019 at 10:01 AM 孙森  wrote:
>
>> Hi Gary:
>>
>>
>> Yes, I enable the checkpoints in my program .
>>
>> 在 2019年3月4日,上午3:03,Gary Yao  写道:
>>
>> Hi Sen,
>>
>> Did you set a restart strategy [1]? If you enabled checkpoints [2], the
>> fixed-
>> delay strategy will be used by default.
>>
>> Best,
>> Gary
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/restart_strategies.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/checkpoints.html
>>
>> On Fri, Mar 1, 2019 at 7:27 AM 孙森  wrote:
>>
>>> Hi Gary:
>>>  I checked the znode, the address of leader was there.
>>>
>>> <屏幕快照 2019-03-01 上午10.45.45 1.png>
>>>
>>> When I removed the ZooKeeper configuration in the client's
>>> flink-conf.yaml, the job was submitted successfully.
>>> Then I tried to test if the HA could work.  I killed the job manager ,it
>>> restarted .But  the job   did not restart when the jog manager  restarted.
>>>
>>>
>>> Best!
>>> Sen
>>>
>>> 在 2019年2月28日,下午6:59,Gary Yao  写道:
>>>
>>> Hi Sen,
>>>
>>> I took a look at the CLI code again, and found out that -m is ignored if
>>> high-
>>> availability: ZOOKEEPER is configured in your flink-conf.yaml. This does
>>> not
>>> seem right and should be at least documented [1].
>>>
>>> Judging from the client logs that you provided, I think the problem is
>>> that
>>> the client cannot resolve the leading JobManager from ZooKeeper [2][3].
>>> You
>>> can try the following things for debugging:
>>>
>>> * Check the contents in the znode
>>>   /flink/[...]/leader/rest_server_lock using the ZK CLI. It should
>>> contain the
>>>   address of the leader. If not, I would check the jobmanager logs
>>> for releated
>>>   errors.
>>>
>>> * Submit the job with -m parameter but without ZooKeeper
>>> configuration in
>>>   the client's flink-conf.yaml
>>>
>>> Best,
>>> Gary
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-11779
>>> [2]
>>> https://github.com/apache/flink/blob/release-1.5.1/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java#L170
>>> [3]
>>> https://github.com/apache/flink/blob/release-1.5.1/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L746-L750
>>>
>>> On Thu, Feb 28, 2019 at 4:34 AM 孙森  wrote:
>>>
>>>> Hi,Gary
>>>>
>>>>Actuall

Re: submit job failed on Yarn HA

2019-03-04 Thread Gary Yao
Hi Sen,

Are you using the default MemoryStateBackend [1]? As far as I know, it does
not support JobManager failover. If you are already using FsStateBackend or
RocksDBStateBackend, please send JM logs.

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html#available-state-backends


On Mon, Mar 4, 2019 at 10:01 AM 孙森  wrote:

> Hi Gary:
>
>
> Yes, I enable the checkpoints in my program .
>
> 在 2019年3月4日,上午3:03,Gary Yao  写道:
>
> Hi Sen,
>
> Did you set a restart strategy [1]? If you enabled checkpoints [2], the
> fixed-
> delay strategy will be used by default.
>
> Best,
> Gary
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/restart_strategies.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/checkpoints.html
>
> On Fri, Mar 1, 2019 at 7:27 AM 孙森  wrote:
>
>> Hi Gary:
>>  I checked the znode, the address of leader was there.
>>
>> <屏幕快照 2019-03-01 上午10.45.45 1.png>
>>
>> When I removed the ZooKeeper configuration in the client's
>> flink-conf.yaml, the job was submitted successfully.
>> Then I tried to test if the HA could work.  I killed the job manager ,it
>> restarted .But  the job   did not restart when the jog manager  restarted.
>>
>>
>> Best!
>> Sen
>>
>> 在 2019年2月28日,下午6:59,Gary Yao  写道:
>>
>> Hi Sen,
>>
>> I took a look at the CLI code again, and found out that -m is ignored if
>> high-
>> availability: ZOOKEEPER is configured in your flink-conf.yaml. This does
>> not
>> seem right and should be at least documented [1].
>>
>> Judging from the client logs that you provided, I think the problem is
>> that
>> the client cannot resolve the leading JobManager from ZooKeeper [2][3].
>> You
>> can try the following things for debugging:
>>
>> * Check the contents in the znode
>>   /flink/[...]/leader/rest_server_lock using the ZK CLI. It should
>> contain the
>>   address of the leader. If not, I would check the jobmanager logs
>> for releated
>>   errors.
>>
>> * Submit the job with -m parameter but without ZooKeeper
>> configuration in
>>   the client's flink-conf.yaml
>>
>> Best,
>> Gary
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-11779
>> [2]
>> https://github.com/apache/flink/blob/release-1.5.1/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java#L170
>> [3]
>> https://github.com/apache/flink/blob/release-1.5.1/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L746-L750
>>
>> On Thu, Feb 28, 2019 at 4:34 AM 孙森  wrote:
>>
>>> Hi,Gary
>>>
>>>Actually, I have several Flink cluster on Yarn ,each for a
>>> project. For one project ,it can only submit job to the specify cluster.
>>> I’ve already enabled logging on DEBUG level.
>>>
>>> How did you determine "jmhost" and "port”?
>>>
>>> We do this by request the rest api :   http://activeRm/proxy/appId/jars
>>> <http://activerm/proxy/appId/jars>
>>>
>>>
>>> The all client log is in the mail attachment.
>>>
>>>
>>>
>>>
>>> 在 2019年2月27日,下午9:30,Gary Yao  写道:
>>>
>>> Hi,
>>>
>>> How did you determine "jmhost" and "port"? Actually you do not need to
>>> specify
>>> these manually. If the client is using the same configuration as your
>>> cluster,
>>> the client will look up the leading JM from ZooKeeper.
>>>
>>> If you have already tried omitting the "-m" parameter, you can check in
>>> the
>>> client logs which host is used for the job submission [1]. Note that you
>>> need
>>> to enable logging on DEBUG level.
>>>
>>> The root cause in your stacktrace is a TimeoutException. I would debug
>>> this by
>>> checking if you can establish a TCP connection – from the machine you are
>>> submitting the job from, to the target host/port [2].
>>>
>>> Moreover, you are using a quite dated Flink version. The newest version
>>> in the
>>> 1.5 major release is 1.5.6 – so consider upgrading to that or even to
>>> 1.7.
>>>
>>> Best,
>>> Gary
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/3488f8b144a2127497c39b8ed5a48a65b551c57d/flink-runtime/src/main/java/o

Re: okio and okhttp not shaded in the Flink Uber Jar on EMR

2019-02-28 Thread Gary Yao
Hi Austin,

Are you running your job detached in a per-job cluster? In that case
inverted
class loading does not work. This is because we add the user jar to the
system
class path, and there is no dynamic class loading involved at the moment
[1].

You can try the YARN session mode, or – as Chesnay already suggested – shade
the dependency on your side.

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/debugging_classloading.html#overview-of-classloading-in-flink


On Wed, Feb 27, 2019 at 8:57 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Thanks Gary,
>
> I will try to look into why the child-first strategy seems to have failed
> for this dependency.
>
> Best,
> Austin
>
> On Wed, Feb 27, 2019 at 12:25 PM Gary Yao  wrote:
>
>> Hi,
>>
>> Actually Flink's inverted class loading feature was designed to mitigate
>> problems with different versions of libraries that are not compatible with
>> each other [1]. You may want to debug why it does not work for you.
>>
>> You can also try to use the Hadoop free Flink distribution, and export the
>> HADOOP_CLASSPATH variable [2].
>>
>> Best,
>> Gary
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/debugging_classloading.html#inverted-class-loading-and-classloader-resolution-order
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/hadoop.html#configuring-flink-with-hadoop-classpaths
>>
>> On Wed, Feb 27, 2019 at 5:23 AM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I recently experienced versioning clashes with the okio and okhttp when
>>> trying to deploy a Flink 1.6.0 app to AWS EMR on Hadoop 2.8.4. After
>>> investigating and talking to the okio team (see this issue)
>>> <https://github.com/square/okio/issues/559>, I found that both okio and
>>> okhttp exist in the Flink uber jar with versions 1.4.0 and 2.4.0,
>>> respectively, whereas I'm including versions 2.2.2 and 3.13.1 in my shaded
>>> jar. The okio team suggested that Flink should shade the uber jar to fix
>>> the issue, but I'm wondering if there is something I can do on my end to
>>> have all versions exist simultaneously.
>>>
>>> From the issue, here are the okio contents of the uber jar:
>>>
>>> *jar -tf flink-shaded-hadoop2-uber-1.6.0.jar | grep okio*
>>>
>>> META-INF/maven/com.squareup.okio/
>>> META-INF/maven/com.squareup.okio/okio/
>>> META-INF/maven/com.squareup.okio/okio/pom.properties
>>> META-INF/maven/com.squareup.okio/okio/pom.xml
>>> okio/
>>> okio/AsyncTimeout$1.class
>>> okio/AsyncTimeout$2.class
>>> okio/AsyncTimeout$Watchdog.class
>>> okio/AsyncTimeout.class
>>> okio/Base64.class
>>> okio/Buffer$1.class
>>> okio/Buffer$2.class
>>> okio/Buffer.class
>>> okio/BufferedSink.class
>>> okio/BufferedSource.class
>>> okio/ByteString.class
>>> okio/DeflaterSink.class
>>> okio/ForwardingSink.class
>>> okio/ForwardingSource.class
>>> okio/ForwardingTimeout.class
>>> okio/GzipSink.class
>>> okio/GzipSource.class
>>> okio/InflaterSource.class
>>> okio/Okio$1.class
>>> okio/Okio$2.class
>>> okio/Okio$3.class
>>> okio/Okio.class
>>> okio/RealBufferedSink$1.class
>>> okio/RealBufferedSink.class
>>> okio/RealBufferedSource$1.class
>>> okio/RealBufferedSource.class
>>> okio/Segment.class
>>> okio/SegmentPool.class
>>> okio/SegmentedByteString.class
>>> okio/Sink.class
>>> okio/Source.class
>>> okio/Timeout$1.class
>>> okio/Timeout.class
>>> okio/Util.class
>>>
>>> Thank you,
>>> Austin Cawley-Edwards
>>>
>>


Re: submit job failed on Yarn HA

2019-02-28 Thread Gary Yao
Hi Sen,

I took a look at the CLI code again, and found out that -m is ignored if
high-
availability: ZOOKEEPER is configured in your flink-conf.yaml. This does not
seem right and should be at least documented [1].

Judging from the client logs that you provided, I think the problem is that
the client cannot resolve the leading JobManager from ZooKeeper [2][3]. You
can try the following things for debugging:

* Check the contents in the znode
  /flink/[...]/leader/rest_server_lock using the ZK CLI. It should
contain the
  address of the leader. If not, I would check the jobmanager logs for
releated
  errors.

* Submit the job with -m parameter but without ZooKeeper configuration
in
  the client's flink-conf.yaml

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-11779
[2]
https://github.com/apache/flink/blob/release-1.5.1/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java#L170
[3]
https://github.com/apache/flink/blob/release-1.5.1/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L746-L750

On Thu, Feb 28, 2019 at 4:34 AM 孙森  wrote:

> Hi,Gary
>
>Actually, I have several Flink cluster on Yarn ,each for a project.
> For one project ,it can only submit job to the specify cluster.
> I’ve already enabled logging on DEBUG level.
>
> How did you determine "jmhost" and "port”?
>
> We do this by request the rest api :   http://activeRm/proxy/appId/jars
>
>
> The all client log is in the mail attachment.
>
>
>
>
> 在 2019年2月27日,下午9:30,Gary Yao  写道:
>
> Hi,
>
> How did you determine "jmhost" and "port"? Actually you do not need to
> specify
> these manually. If the client is using the same configuration as your
> cluster,
> the client will look up the leading JM from ZooKeeper.
>
> If you have already tried omitting the "-m" parameter, you can check in the
> client logs which host is used for the job submission [1]. Note that you
> need
> to enable logging on DEBUG level.
>
> The root cause in your stacktrace is a TimeoutException. I would debug
> this by
> checking if you can establish a TCP connection – from the machine you are
> submitting the job from, to the target host/port [2].
>
> Moreover, you are using a quite dated Flink version. The newest version in
> the
> 1.5 major release is 1.5.6 – so consider upgrading to that or even to 1.7.
>
> Best,
> Gary
>
> [1]
> https://github.com/apache/flink/blob/3488f8b144a2127497c39b8ed5a48a65b551c57d/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java#L185
> [2]
> https://stackoverflow.com/questions/4922943/test-from-shell-script-if-remote-tcp-port-is-open
>
> On Wed, Feb 27, 2019 at 8:09 AM 孙森  wrote:
>
>> Hi all:
>>
>> I run flink (1.5.1 with hadoop 2.7) on yarn ,and submit job by
>> “/usr/local/flink/bin/flink run -m jmhost:port my.jar”, but the submission
>> is failed.
>> The HA configuration is :
>>
>>-  high-availability: zookeeper
>>-  high-availability.storageDir: hdfs:///flink/ha/
>>-  high-availability.zookeeper.quorum:  hdp1:2181,hdp2:2181,hdp3:2181
>>-  yarn.application-attempts: 2
>>
>> The info showed int the client log:
>>
>>
>> 2019-02-27 11:48:38,651 INFO  org.apache.flink.runtime.rest.RestClient   
>>- Shutting down rest endpoint.
>> 2019-02-27 11:48:38,659 INFO  org.apache.flink.runtime.rest.RestClient   
>>- Rest endpoint shutdown complete.
>> 2019-02-27 11:48:38,662 INFO  
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
>> Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
>> 2019-02-27 11:48:38,665 INFO  
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
>> Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
>> 2019-02-27 11:48:38,670 INFO  
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
>>   - backgroundOperationsLoop exiting
>> 2019-02-27 11:48:38,689 INFO  
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session: 
>> 0x2679c52880c00ee closed
>> 2019-02-27 11:48:38,689 INFO  
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - 
>> EventThread shut down for session: 0x2679c52880c00ee
>> 2019-02-27 11:48:38,690 ERROR org.apache.flink.client.cli.CliFrontend
>>- Error while running the command.
>> org.apache.flink.client.program.ProgramInvocationException: Could not 
>> retrieve the execution result.
>>  at 

Re: okio and okhttp not shaded in the Flink Uber Jar on EMR

2019-02-27 Thread Gary Yao
Hi,

Actually Flink's inverted class loading feature was designed to mitigate
problems with different versions of libraries that are not compatible with
each other [1]. You may want to debug why it does not work for you.

You can also try to use the Hadoop free Flink distribution, and export the
HADOOP_CLASSPATH variable [2].

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/debugging_classloading.html#inverted-class-loading-and-classloader-resolution-order
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/hadoop.html#configuring-flink-with-hadoop-classpaths

On Wed, Feb 27, 2019 at 5:23 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi,
>
> I recently experienced versioning clashes with the okio and okhttp when
> trying to deploy a Flink 1.6.0 app to AWS EMR on Hadoop 2.8.4. After
> investigating and talking to the okio team (see this issue)
> , I found that both okio and
> okhttp exist in the Flink uber jar with versions 1.4.0 and 2.4.0,
> respectively, whereas I'm including versions 2.2.2 and 3.13.1 in my shaded
> jar. The okio team suggested that Flink should shade the uber jar to fix
> the issue, but I'm wondering if there is something I can do on my end to
> have all versions exist simultaneously.
>
> From the issue, here are the okio contents of the uber jar:
>
> *jar -tf flink-shaded-hadoop2-uber-1.6.0.jar | grep okio*
>
> META-INF/maven/com.squareup.okio/
> META-INF/maven/com.squareup.okio/okio/
> META-INF/maven/com.squareup.okio/okio/pom.properties
> META-INF/maven/com.squareup.okio/okio/pom.xml
> okio/
> okio/AsyncTimeout$1.class
> okio/AsyncTimeout$2.class
> okio/AsyncTimeout$Watchdog.class
> okio/AsyncTimeout.class
> okio/Base64.class
> okio/Buffer$1.class
> okio/Buffer$2.class
> okio/Buffer.class
> okio/BufferedSink.class
> okio/BufferedSource.class
> okio/ByteString.class
> okio/DeflaterSink.class
> okio/ForwardingSink.class
> okio/ForwardingSource.class
> okio/ForwardingTimeout.class
> okio/GzipSink.class
> okio/GzipSource.class
> okio/InflaterSource.class
> okio/Okio$1.class
> okio/Okio$2.class
> okio/Okio$3.class
> okio/Okio.class
> okio/RealBufferedSink$1.class
> okio/RealBufferedSink.class
> okio/RealBufferedSource$1.class
> okio/RealBufferedSource.class
> okio/Segment.class
> okio/SegmentPool.class
> okio/SegmentedByteString.class
> okio/Sink.class
> okio/Source.class
> okio/Timeout$1.class
> okio/Timeout.class
> okio/Util.class
>
> Thank you,
> Austin Cawley-Edwards
>


Re: flink list and flink run commands timeout

2019-02-27 Thread Gary Yao
Hi Sen Sun,

The question is already resolved. You can find the entire email thread here:


http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/flink-list-and-flink-run-commands-timeout-td22826.html

Best,
Gary

On Wed, Feb 27, 2019 at 7:55 AM sen  wrote:

> Hi Aneesha:
>
>   I am also facing the same problem.When I turn on the HA on yarn ,it
> will get the same exception. While I turn off the Ha configuration ,it
> works
> fine.
>   I want to know that what  did  you do to deal with the problem?
>
> Thanks!
> Sen Sun
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: submit job failed on Yarn HA

2019-02-27 Thread Gary Yao
Hi,

How did you determine "jmhost" and "port"? Actually you do not need to
specify
these manually. If the client is using the same configuration as your
cluster,
the client will look up the leading JM from ZooKeeper.

If you have already tried omitting the "-m" parameter, you can check in the
client logs which host is used for the job submission [1]. Note that you
need
to enable logging on DEBUG level.

The root cause in your stacktrace is a TimeoutException. I would debug this
by
checking if you can establish a TCP connection – from the machine you are
submitting the job from, to the target host/port [2].

Moreover, you are using a quite dated Flink version. The newest version in
the
1.5 major release is 1.5.6 – so consider upgrading to that or even to 1.7.

Best,
Gary

[1]
https://github.com/apache/flink/blob/3488f8b144a2127497c39b8ed5a48a65b551c57d/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java#L185
[2]
https://stackoverflow.com/questions/4922943/test-from-shell-script-if-remote-tcp-port-is-open

On Wed, Feb 27, 2019 at 8:09 AM 孙森  wrote:

> Hi all:
>
> I run flink (1.5.1 with hadoop 2.7) on yarn ,and submit job by
> “/usr/local/flink/bin/flink run -m jmhost:port my.jar”, but the submission
> is failed.
> The HA configuration is :
>
>-  high-availability: zookeeper
>-  high-availability.storageDir: hdfs:///flink/ha/
>-  high-availability.zookeeper.quorum:  hdp1:2181,hdp2:2181,hdp3:2181
>-  yarn.application-attempts: 2
>
> The info showed int the client log:
>
>
> 2019-02-27 11:48:38,651 INFO  org.apache.flink.runtime.rest.RestClient
>   - Shutting down rest endpoint.
> 2019-02-27 11:48:38,659 INFO  org.apache.flink.runtime.rest.RestClient
>   - Rest endpoint shutdown complete.
> 2019-02-27 11:48:38,662 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
> 2019-02-27 11:48:38,665 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
> 2019-02-27 11:48:38,670 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
>   - backgroundOperationsLoop exiting
> 2019-02-27 11:48:38,689 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session: 
> 0x2679c52880c00ee closed
> 2019-02-27 11:48:38,689 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - 
> EventThread shut down for session: 0x2679c52880c00ee
> 2019-02-27 11:48:38,690 ERROR org.apache.flink.client.cli.CliFrontend 
>   - Error while running the command.
> org.apache.flink.client.program.ProgramInvocationException: Could not 
> retrieve the execution result.
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:257)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>  

Re: Submitting job to Flink on yarn timesout on flip-6 1.5.x

2019-02-21 Thread Gary Yao
Hi,

Beginning with Flink 1.7, you cannot use the legacy mode anymore [1][2]. I
am
currently working on removing references to the legacy mode in the
documentation [3]. Is there any reason, you cannot use the "new mode"?

Best,
Gary

[1] https://flink.apache.org/news/2018/11/30/release-1.7.0.html
[2] https://issues.apache.org/jira/browse/FLINK-10392
[3] https://issues.apache.org/jira/browse/FLINK-11713

On Mon, Feb 18, 2019 at 12:00 PM Richard Deurwaarder 
wrote:

> Hello,
>
> I am trying to upgrade our job from flink 1.4.2 to 1.7.1 but I keep
> running into timeouts after submitting the job.
>
> The flink job runs on our hadoop cluster and starts using Yarn.
>
> Relevant config options seem to be:
>
> jobmanager.rpc.port: 55501
>
> recovery.jobmanager.port: 55502
>
> yarn.application-master.port: 55503
>
> blob.server.port: 55504
>
>
> I've seen the following behavior:
>   - Using the same flink-conf.yaml as we used in 1.4.2: 1.5.6 / 1.6.3 /
> 1.7.1 all versions timeout while 1.4.2 works.
>   - Using 1.5.6 with "mode: legacy" (to switch off flip-6) works
>   - Using 1.7.1 with "mode: legacy" gives timeout (I assume this option
> was removed but the documentation is outdated?
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#legacy
> )
>
> When the timeout happens I get the following stacktrace:
>
> INFO class java.time.Instant does not contain a getter for field seconds
> 2019-02-18T10:16:56.815+01:00
> INFO class com.bol.fin_hdp.cm1.domain.Cm1Transportable does not contain a
> getter for field globalId 2019-02-18T10:16:56.815+01:00
> INFO Submitting job 5af931bcef395a78b5af2b97e92dcffe (detached: false).
> 2019-02-18T10:16:57.182+01:00
> INFO 
> 2019-02-18T10:29:27.527+01:00
> INFO The program finished with the following exception:
> 2019-02-18T10:29:27.564+01:00
> INFO org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error. 2019-02-18T10:29:27.601+01:00
> INFO at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
> 2019-02-18T10:29:27.638+01:00
> INFO at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
> 2019-02-18T10:29:27.675+01:00
> INFO at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
> 2019-02-18T10:29:27.711+01:00
> INFO at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:798)
> 2019-02-18T10:29:27.747+01:00
> INFO at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:289)
> 2019-02-18T10:29:27.784+01:00
> INFO at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
> 2019-02-18T10:29:27.820+01:00
> INFO at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1035)
> 2019-02-18T10:29:27.857+01:00
> INFO at
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:)
> 2019-02-18T10:29:27.893+01:00
> INFO at java.security.AccessController.doPrivileged(Native Method)
> 2019-02-18T10:29:27.929+01:00
> INFO at javax.security.auth.Subject.doAs(Subject.java:422)
> 2019-02-18T10:29:27.968+01:00
> INFO at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> 2019-02-18T10:29:28.004+01:00
> INFO at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 2019-02-18T10:29:28.040+01:00
> INFO at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:)
> 2019-02-18T10:29:28.075+01:00
> INFO Caused by: java.lang.RuntimeException:
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. 2019-02-18T10:29:28.110+01:00
> INFO at
> com.bol.fin_hdp.job.starter.IntervalJobStarter.startJob(IntervalJobStarter.java:43)
> 2019-02-18T10:29:28.146+01:00
> INFO at
> com.bol.fin_hdp.job.starter.IntervalJobStarter.startJobWithConfig(IntervalJobStarter.java:32)
> 2019-02-18T10:29:28.182+01:00
> INFO at com.bol.fin_hdp.Main.main(Main.java:8)
> 2019-02-18T10:29:28.217+01:00
> INFO at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2019-02-18T10:29:28.253+01:00
> INFO at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2019-02-18T10:29:28.289+01:00
> INFO at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2019-02-18T10:29:28.325+01:00
> INFO at java.lang.reflect.Method.invoke(Method.java:498)
> 2019-02-18T10:29:28.363+01:00
> INFO at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> 2019-02-18T10:29:28.400+01:00
> INFO ... 12 more 2019-02-18T10:29:28.436+01:00
> INFO Caused by:
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. 2019-02-18T10:29:28.473+01:00
> INFO at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)
> 

Re: Each yarn container only use 1 vcore even if taskmanager.numberOfTaskSlots is set

2019-02-17 Thread Gary Yao
Hi Henry,

If I understand you correctly, you want YARN to allocate 4 vcores per TM
container. You can achieve this by enabling the FairScheduler in YARN
[1][2].

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#yarn-containers-vcores
[2]
https://hadoop.apache.org/docs/r2.7.4/hadoop-yarn/hadoop-yarn-site/FairScheduler.html

On Mon, Feb 18, 2019 at 3:49 AM 徐涛  wrote:

> Hi Experts,
> I am running Flink 1.7.1 program on Yarn 2.7, the
> taskmanager.numberOfTaskSlots is set to 4. The parallelism.default is set
> to 8. When the program is running, 3 yarn containers is launched, but each
> of them only use 1 vcore, I think by default the number of vcores is set to
> the number of slots per TaskManager.
> Because it can not get enough CPU resources, the CPU load may be
> high. So how can this problem be fixed?
> Thanks a lot for your help.
>
> Best
> Henry


Re: Flink 1.6 Yarn Session behavior

2019-02-17 Thread Gary Yao
Hi Jins George,

Every TM brings additional overhead, e.g., more heartbeat messages.
However, a
cluster with 28 TMs would not be considered big as there are users that are
running Flink applications on thousands of cores [1][2].

Best,
Gary

[1]
https://flink.apache.org/flink-architecture.html#run-applications-at-any-scale
[2]
https://de.slideshare.net/FlinkForward/flink-forward-sf-2017-stephan-ewen-experiences-running-flink-at-very-large-scale

On Thu, Feb 14, 2019 at 6:59 PM Jins George  wrote:

> Thanks Gary. Understood the behavior.
>
> I am leaning towards running 7 TM on each machine(8 core), I have 4 nodes,
> that will end up 28 taskmanagers and 1 job manager. I was wondering if this
> can bring additional burden on jobmanager? Is it recommended?
>
> Thanks,
>
> Jins George
> On 2/14/19 8:49 AM, Gary Yao wrote:
>
> Hi Jins George,
>
> This has been asked before [1]. The bottom line is that you currently
> cannot
> pre-allocate TMs and distribute your tasks evenly. You might be able to
> achieve a better distribution across hosts by configuring fewer slots in
> your
> TMs.
>
> Best,
> Gary
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-5-job-distribution-over-cluster-nodes-td21588.html
>
>
> On Wed, Feb 13, 2019 at 6:20 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> I'm forwarding this question to Gary (CC'ed), who most likely would have
>> an answer for your question here.
>>
>> Cheers,
>> Gordon
>>
>> On Wed, Feb 13, 2019 at 8:33 AM Jins George 
>> wrote:
>>
>>> Hello community,
>>>
>>> I am trying to  upgrade a  Flink Yarn session cluster running BEAM
>>> pipelines  from version 1.2.0 to 1.6.3.
>>>
>>> Here is my session start command: yarn-session.sh -d *-n 4*  -jm 1024
>>> -tm 3072 *-s 7*
>>>
>>> Because of the dynamic resource allocation,  no taskmanager gets created
>>> initially. Now once I submit a job with parallelism 5, I see that 1
>>> task-manager gets created and all 5 parallel instances are scheduled on the
>>> same taskmanager( because I have 7 slots).  This can create hot spot as
>>> only one physical node ( out of 4 in my case) is utilized for processing.
>>>
>>> I noticed the legacy mode, which would provision all task managers at
>>> cluster creation, but since legacy mode is expected to go away soon, I
>>> didn't want to try that route.
>>>
>>> Is there a way I can configure the multiple jobs or parallel instances
>>> of same job spread across all the available Yarn nodes and continue using
>>> the 'new' mode ?
>>>
>>> Thanks,
>>>
>>> Jins George
>>>
>>


Re: No resource available error while testing HA

2019-02-14 Thread Gary Yao
Hi Averell,

The TM containers fetch the Flink binaries and config files form HDFS (or
another DFS if configured) [1]. I think you should be able to change the log
level by patching the logback configuration in HDFS, and kill all Flink
containers on all hosts. If you are running an HA setup, your cluster should
be running with the new logback configuration afterwards.

Best,
Gary

[1]
https://github.com/apache/flink/blob/02ff4bfe90d8e8b896c9f1a1bdbe8d43a48f5de7/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L691

On Wed, Feb 13, 2019 at 12:44 PM Averell  wrote:

> Hi Gary,
>
> Thanks for the suggestion.
>
> How about changing the configuration of the Flink job itself during
> runtime?
> What I have to do now is to take a savepoint, stop the job, change the
> configuration, and then restore the job from the save point.
>
> Is there any easier way to do that?
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink 1.6 Yarn Session behavior

2019-02-14 Thread Gary Yao
Hi Jins George,

This has been asked before [1]. The bottom line is that you currently cannot
pre-allocate TMs and distribute your tasks evenly. You might be able to
achieve a better distribution across hosts by configuring fewer slots in
your
TMs.

Best,
Gary

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-5-job-distribution-over-cluster-nodes-td21588.html


On Wed, Feb 13, 2019 at 6:20 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> I'm forwarding this question to Gary (CC'ed), who most likely would have
> an answer for your question here.
>
> Cheers,
> Gordon
>
> On Wed, Feb 13, 2019 at 8:33 AM Jins George  wrote:
>
>> Hello community,
>>
>> I am trying to  upgrade a  Flink Yarn session cluster running BEAM
>> pipelines  from version 1.2.0 to 1.6.3.
>>
>> Here is my session start command: yarn-session.sh -d *-n 4*  -jm 1024
>> -tm 3072 *-s 7*
>>
>> Because of the dynamic resource allocation,  no taskmanager gets created
>> initially. Now once I submit a job with parallelism 5, I see that 1
>> task-manager gets created and all 5 parallel instances are scheduled on the
>> same taskmanager( because I have 7 slots).  This can create hot spot as
>> only one physical node ( out of 4 in my case) is utilized for processing.
>>
>> I noticed the legacy mode, which would provision all task managers at
>> cluster creation, but since legacy mode is expected to go away soon, I
>> didn't want to try that route.
>>
>> Is there a way I can configure the multiple jobs or parallel instances of
>> same job spread across all the available Yarn nodes and continue using the
>> 'new' mode ?
>>
>> Thanks,
>>
>> Jins George
>>
>


Re: Flink Standalone cluster - logging problem

2019-02-11 Thread Gary Yao
Hi,

Are you logging from your own operator implementations, and you expect these
log messages to end up in a file prefixed with XYZ-? If that is the case,
modifying log4j-cli.properties will not be expedient as I wrote earlier.

You should modify the log4j.properties on all hosts that are running the
JobManager (JM) and TaskManagers (TM). Consequently, the log files can only
be
found on the hosts that are running the JM and TMs.

Furthermore, I see a problem with the following line in your log4j
configuration:

log4j.appender.file2.file=XYZ-${log.file}

Here, ${log.file} can be an absolute path, which means you would end up
prefixing the whole path instead of just the filename.

If this does not help, please share a minimum working example with us.

Best,
Gary


On Mon, Feb 11, 2019 at 12:02 PM simpleusr  wrote:

> Hi Gary,
>
> By "job logs" I mean all the loggers under a subpackage of
> com.mycompany.xyz
> .
>
> We are using ./bin/flink run command for job execution thats why I modified
> log4j-cli.properties. Modification of log4j.properties also did not help...
>
> Regards
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: No resource available error while testing HA

2019-02-11 Thread Gary Yao
Hi Averell,

Logback has this feature [1] but is not enabled out of the box. You will
have
to enable the JMX agent by setting the com.sun.management.jmxremote system
property [2][3]. I have not tried this out, though.

Best,
Gary

[1] https://logback.qos.ch/manual/jmxConfig.html
[2]
https://docs.oracle.com/javase/8/docs/technotes/guides/management/agent.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#env-java-opts

On Thu, Feb 7, 2019 at 11:51 AM Averell  wrote:

> Hi Gary,
>
> I am trying to reproduce that problem.
> BTW, is that possible to change log level (I'm using logback) for a running
> job?
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink Standalone cluster - logging problem

2019-02-11 Thread Gary Yao
Hi,

Can you define what you mean by "job logs"? For code that is run on the
cluster, i.e., JM or TM, you should add your config to log4j.properties. The
log4j-cli.properties file is only used by the Flink CLI process.

Best,
Gary

On Mon, Feb 11, 2019 at 7:39 AM simpleusr  wrote:

> Hi Chesnay,
>
> below is the content for my log4j-cli.properties file. I expect my job logs
> (packaged under com.mycompany.xyz to be written to file2 appender. However
> no file generated with prefix XYZ. I restarted the cluster , canceled
> resubmitted several times but none of them helped.
>
>
> /
> log4j.rootLogger=INFO, file
>
> # Log all infos in the given file
> log4j.appender.file=org.apache.log4j.FileAppender
> log4j.appender.file.file=${log.file}
> log4j.appender.file.append=false
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
>
> log4j.appender.file2=org.apache.log4j.FileAppender
> log4j.appender.file2.file=XYZ-${log.file}
> log4j.appender.file2.append=false
> log4j.appender.file2.layout=org.apache.log4j.PatternLayout
> log4j.appender.file2.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
>
>
> # Log output from org.apache.flink.yarn to the console. This is used by the
> # CliFrontend class when using a per-job YARN cluster.
> log4j.logger.org.apache.flink.yarn=INFO, console
> log4j.logger.org.apache.flink.yarn.cli.FlinkYarnSessionCli=INFO, console
> log4j.logger.org.apache.hadoop=INFO, console
>
> log4j.appender.console=org.apache.log4j.ConsoleAppender
> log4j.appender.console.layout=org.apache.log4j.PatternLayout
> log4j.appender.console.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
>
> # suppress the warning that hadoop native libraries are not loaded
> (irrelevant for the client)
> log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF
>
> # suppress the irrelevant (wrong) warnings from the netty channel handler
>
> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
> file
>
>
> log4j.logger.com.hazelcast=INFO, file2
> log4j.logger.com.mycompany.xyz=DEBUG, file2/
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: No resource available error while testing HA

2019-02-06 Thread Gary Yao
Hi Averell,

That log file does not look complete. I do not see any INFO level log
messages
such as [1].

Best,
Gary

[1]
https://github.com/apache/flink/blob/46326ab9181acec53d1e9e7ec8f4a26c672fec31/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java#L544

On Fri, Feb 1, 2019 at 12:18 AM Averell  wrote:

> Hi Gary,
>
> I faced a similar problem yesterday, but don't know what was the cause yet.
> The situation that I observed is as follow:
>  - At about 2:57, one of my EMR execution node (IP ...99) got disconnected
> from YARN resource manager (on RM I could not see that node anymore),
> despite that the node was still running. <<< This is another issue, but I
> believe it is with YARN.
>  - About 8 hours after that (between 10:00 - 11:00), I turned the
> problematic EMR core node off. AWS spun up another node and added it to the
> cluster to replace that. YARN RM soon recognized the new node and added it
> to its list of available nodes.
> However, the JM seemed to not (able to) do anything after that. It kept
> trying to start the job, failed after the timeout and that "no resource
> available" exception again and again. No jobmanager logs recorded since
> 2:57:15 though.
>
> I am attaching the logs collected via "yarn logs --applicationId 
> here. But it seems I still missed something.
>
> I am using Flink 1.7.1, with yarn-site configuration
> yarn.resourcemanager.am.max-attempts=5. Flink configurations are all of the
> default values.
>
> Thanks and best regards,
> Averell flink.log
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/flink.log>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Issue setting up Flink in Kubernetes

2019-01-29 Thread Gary Yao
Hi Tim,

There is an end-to-end test in the Flink repository that starts a job
cluster
in Kubernetes (minikube) [1]. If that does not help you, can you answer the
questions below?

What docker images are you using? Can you share the kubernetes resource
definitions? Can you share the complete logs of the JM and TMs? Did you
follow
the steps outlined in the Flink documentation [2]?

Best,
Gary

[1]
https://github.com/apache/flink/blob/81acd0a490f3ac40cbb2736189796138ac109dd0/flink-end-to-end-tests/test-scripts/test_kubernetes_embedded_job.sh#L46
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/kubernetes.html


On Tue, Jan 29, 2019 at 5:32 AM Timothy Victor  wrote:

> Hi -
>
> Has there been any update on the below issue?   I am also facing the same
> problem.
>
>
> http://mail-archives.apache.org/mod_mbox/flink-user/201812.mbox/%3ccac2r2948lqsyu8nab5p7ydnhhmuox5i4jmyis9g7og6ic-1...@mail.gmail.com%3E
>
> There is a similar issue (
> https://stackoverflow.com/questions/50806228/cant-submit-job-with-flink-1-5-cluster)
> where task managers cannot reach the job manager, and the solution appeared
> to be to add JOB_MANAGER_RPC_ADDRESS to /etc/hosts.   However, the issue
> above is slightly different in that the TMs appear to try to use the
> Kubernetes pod name to connect.
>
> Thanks
>
> Tim
>


Re: No resource available error while testing HA

2019-01-29 Thread Gary Yao
Hi Averell,

> Is there any way to avoid this? As if I run this as an AWS EMR job, the
job
> would be considered failed, while it is actually be restored
automatically by
> YARN after 10 minutes).

You are writing that it takes YARN 10 minutes to restart the application
master (AM). However, in my experiments the AM container is restarted
within a
few seconds when after killing the process. If in your setup YARN actually
needs 10 minutes to restart the AM, then you could try increasing the number
of retry attempts by the client [2].

> Regarding logging, could you please help explain about the source of the
> error messages show in "Exception" tab on Flink Job GUI (as per the
> screenshot below).

The REST API that is queried by the Web UI returns the root cause from the
ExecutionGraph [3]. All job status transitions should be logged together
with
the exception that caused the transition [4]. Check for INFO level log
messages that start with "Job [...] switched from state" followed by a
stacktrace. If you cannot find the exception, the problem might be rooted in
your log4j or logback configuration.

Best,
Gary

[1]
https://github.com/apache/flink/blob/81acd0a490f3ac40cbb2736189796138ac109dd0/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L767
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#rest-retry-max-attempts
[3]
https://github.com/apache/flink/blob/81acd0a490f3ac40cbb2736189796138ac109dd0/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java#L87
[4]
https://github.com/apache/flink/blob/81acd0a490f3ac40cbb2736189796138ac109dd0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L1363

On Fri, Jan 25, 2019 at 12:42 PM Averell  wrote:

> Hi Gary,
>
> Yes, my problem mentioned in the original post had been resolved by
> correcting the zookeeper connection string.
>
> I have two other relevant questions, if you have time, please help:
>
> 1. Regarding JM high availability, when I shut down the host having JM
> running, YARN would detect that missing JM and start a new one after 10
> minutes, and the Flink job would be restored. However, on the console
> screen
> that I submitted the job, I got the following error messages: "/The program
> finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException/" (full stack
> trace in the attached file  flink_console_timeout.log
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/flink_console_timeout.log>
>
> )
> Is there any way to avoid this? As if I run this as an AWS EMR job, the job
> would be considered failed, while it is actually be restored automatically
> by YARN after 10 minutes).
>
> 2. Regarding logging, could you please help explain about the source of the
> error messages show in "Exception" tab on Flink Job GUI (as per the
> screenshot below). I could not find any log files has that message (not in
> jobmanager.log or in taskmanager.log in EMR's hadoop-yarn logs folder).
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Screen_Shot_2019-01-25_at_22.png>
>
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: No resource available error while testing HA

2019-01-24 Thread Gary Yao
Hi Averell,

> Then I have another question: when JM cannot start/connect to the JM on
.88,
> why didn't it try on .82 where resource are still available?

When you are deploying on YARN, the TM container placement is decided by the
YARN scheduler and not by Flink. Without seeing the complete logs, it is
difficult to tell what happened. If you need help with debugging, please
enable YARN's log aggregation and attach the output of:

yarn logs -applicationId 

Do I understand it correctly that your problem was solved by changing the
zookeper connection string?

Best,
Gary

On Wed, Jan 23, 2019 at 12:44 PM Averell  wrote:

> Hi Gary,
>
> Thanks for your support.
>
> I use flink 1.7.0. I will try to test without that -n.
> Here below are the JM log (on server .82) and TM log (on server .88). I'm
> sorry that I missed that TM log before asking, had a thought that it would
> not relevant. I just fixed the issue with connection to zookeeper and the
> problem was solved.
>
> Then I have another question: when JM cannot start/connect to the JM on
> .88,
> why didn't it try on .82 where resource are still available?
>
> Thanks and regards,
> Averell
>
> Here is the JM log (from /mnt/var/log/hadoop-yarn/.../jobmanager.log on
> .82)
> (it seems irrelevant. Even the earlier message regarding
> NoResourceAvailable
> was there in GUI, but not found in the jobmanager.log file):
>
> 2019-01-23 04:15:01.869 [main] WARN
> org.apache.flink.configuration.Configuration  - Config uses deprecated
> configuration key 'web.port' instead of proper key 'rest.port'
> 2019-01-23 04:15:03.483 [main] WARN
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint  - Upload
> directory
> /tmp/flink-web-08279f45-0244-4c5c-bc9b-299ac59b4068/flink-web-upload does
> not exist, or has been deleted externally. Previously uploaded files are no
> longer available.
>
> And here is the TM log:
> 2019-01-23 11:07:07.479 [main] ERROR
> o.a.flink.shaded.curator.org.apache.curator.ConnectionState  - Connection
> timed out for connection string (localhost:2181) and timeout (15000) /
> elapsed (56538)
>
> org.apache.flink.shaded.curator.org.apache.curator.CuratorConnectionLossException:
> KeeperErrorCode = ConnectionLoss
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.NamespaceImpl$1.call(NamespaceImpl.java:90)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:109)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.NamespaceImpl.fixForNamespace(NamespaceImpl.java:83)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.fixForNamespace(CuratorFrameworkImpl.java:594)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:158)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:32)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.reset(NodeCache.java:242)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.start(NodeCache.java:175)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.start(NodeCache.java:154)
> at
>
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService.start(ZooKeeperLeaderRetrievalService.java:107)
> at
>
> org.apache.flink.runtime.taskexecutor.TaskExecutor.start(TaskExecutor.java:277)
> at
>
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:168)
> at
>
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:332)
> at
>
> org.apache.flink.yarn.YarnTaskExecutorRunner.lambda$run$0(YarnTaskExecutorRunner.java:142)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
>
> org.apache.flink.yarn.YarnTaskExecutorRunner.run(YarnTaskExecutorRunner.java:141)
> at
>
> org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:75)
> 2019-01-23 11:07:08.224 [main-SendThread(localhost:2181)] WARN
> 

Re: No resource available error while testing HA

2019-01-23 Thread Gary Yao
Hi Averell,

What Flink version are you using? Can you attach the full logs from JM and
TMs? Since Flink 1.5, the -n parameter (number of taskmanagers) should be
omitted unless you are in legacy mode [1].

> As per that screenshot, it looks like there are 2 tasks manager still
> running (one on each host .88 and .81), which means the one on .88 has not
> been cleaned properly. If it is, then how to clean it?

The TMs should terminate if they cannot register at the JM [2].

> I wonder whether when the server with JobManager crashes, the whole job is
> restarted, or a new JobManager will try to connect to the running TMs to
> resume the job?

The whole job is restarted but any existing TM containers are reused.

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#legacy
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#taskmanager-registration-timeout

On Wed, Jan 23, 2019 at 7:19 AM Averell  wrote:

> Hello everyone,
>
> I am testing High Availability of Flink on YARN on an AWS EMR cluster.
> My configuration is an EMR with one master-node and 3 core-nodes (each with
> 16 vCores). Zookeeper is running on all nodes.
> Yarn session was created with: flink-yarn-session -n 2 -s 8 -jm 1024m -tm
> 20g
> A job with parallelism of 16 was submitted.
>
> I tried to execute the test by terminating the core-node (using Linux "init
> 0") having the job-manager running on. The first few restarts worked well -
> a new job-manager was elected, and the job was resumed properly.
> However, after some restarts, the new job-manager could not retrieve its
> needed resource any more (only one TM on the node with IP .81 was shown in
> the Task Managers GUI).
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Flink.png>
>
>
> I kept getting the error message
>
> "org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 30 ms. Slots
> required: 108, slots allocated: 60".
>
> Here below is what shown in YARN Resource Manager.
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Yarn.png>
>
>
> As per that screenshot, it looks like there are 2 tasks manager still
> running (one on each host .88 and .81), which means the one on .88 has not
> been cleaned properly. If it is, then how to clean it?
>
> I wonder whether when the server with JobManager crashes, the whole job is
> restarted, or a new JobManager will try to connect to the running TMs to
> resume the job?
>
>
> Thanks and regards,
> Averell
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: ElasticSearch RestClient throws NoSuchMethodError due to shade mechanism

2019-01-18 Thread Gary Yao
Hi Henry,

Can you share your pom.xml and the full stacktrace with us? It is expected
behavior that org.elasticsearch.client.RestClientBuilder is not shaded. That
class comes from the elasticsearch Java client, and we only shade its
transitive dependencies. Could it be that you have a dependency in your
job's pom.xml to a different version of the elasticsearch client?

Best,
Gary

On Tue, Jan 15, 2019 at 11:39 AM 徐涛  wrote:

> Hi All,
> I use the following code try to build a RestClient
> org.elasticsearch.client.RestClient.builder(  new HttpHost(xxx,
> xxx,"http")  ).build()
> but when in running time, a NoSuchMethodError throws out, I think the
> reason is:
> There are two RestClient classes, one is in the jar I include, the other
> one is in flink-connector-elasticsearch5, but the argument of build method
> in flink-connector-elasticsearch5 is
> org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.apache.http.HttpHost.
> So I want to know why org.elasticsearch.client.RestClientBuilder is not
> shaded, so runtime class conflict could be avoided?
>
>* public static RestClientBuilder
> builder(org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.apache.http.HttpHost...
> hosts) {*
> *return new RestClientBuilder(hosts);*
> *}*
>
> Best
> Henry
>


Re: Flink 1.7.1 job is stuck in running state

2019-01-18 Thread Gary Yao
Hi Piotr,

Ideally on DEBUG level.

Best,
Gary

On Fri, Jan 18, 2019 at 3:41 PM Piotr Szczepanek 
wrote:

> Hey Gary,
> thanks for your reply.
> Before we have been using Flink version 1.5.2.
> With both version we're using Flink deployed on Yarn.
>
> Regarding log would you like to have log entries with DEBUG enabled or
> INFO would be enough?
>
> Thanks,
> Piotr
>
> pt., 18 sty 2019 o 15:14 Gary Yao  napisał(a):
>
>> Hi Piotr,
>>
>> What was the version you were using before 1.7.1?
>> How do you deploy your cluster, e.g., YARN, standalone?
>> Can you attach full TM and JM logs?
>>
>> Best,
>> Gary
>>
>> On Fri, Jan 18, 2019 at 3:05 PM Piotr Szczepanek <
>> piotr.szczepa...@gmail.com> wrote:
>>
>>> Hello,
>>> we have scenario with running Data Processing jobs that generates export
>>> files on demand. Our first approach was using ClusterClient, but recently
>>> we switched to REST API for job submittion. In the meantime we switched to
>>> flink 1.7.1 and that started to cause a problems.
>>> Some of our jobs are stuck, not processing any data. Task Managers have
>>> info that Chain is switching to RUNNING, and then nothing happenes.
>>> In TM's stdout logs we can see that for some reason log is cut, e.g.:
>>>
>>> Jan 10, 2019 4:28:33 PM INFO:
>>> org.apache.parquet.hadoop.InternalParquetRecordReader: RecordReader
>>> initialized will read a total of 615 records.
>>> Jan 10, 2019 4:28:33 PM INFO:
>>> org.apache.parquet.hadoop.InternalParquetRecordReader: at row 0. reading
>>> next block
>>> Jan 10, 2019 4:28:33 PM INFO:
>>> org.apache.parquet.hadoop.InternalParquetRecordReader: block read in memory
>>> in 63 ms. row count = 615
>>> Jan 10, 2019 4:28:33 PM WARNING:
>>> org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize counter
>>> due to context is not a instance of TaskInputOutputContext, but is
>>> org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
>>> Jan 10, 2019 4:28:33 PM INFO:
>>> org.apache.parquet.hadoop.InternalParquetRecordReader: RecordReader
>>> initialized will read a total of 140 records.
>>> Jan 10, 2019 4:28:33 PM INFO:
>>> org.apache.parquet.hadoop.InternalParquetRecordReader: at row 0. reading
>>> next block
>>> Jan 10, 2019 4:28:33 PM INFO:
>>> org.apache.parquet.hadoop.InternalParquetRecordReader: block read in memory
>>> in 2 ms. row count = 140
>>> Jan 10, 2019 4:28:33 PM WARNING:
>>> org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize counter
>>> due to context is not a instance of TaskInputOutputContext, but is
>>> org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
>>> Jan 10, 2019 4:28:33 PM INFO: or
>>>
>>> As you can see, last line is cut in the middle, and nothing happenes
>>> later on.
>>> None of counters ( records/bytes sent/read) are increased.
>>> We switched debug on on both TMs and JM but only thing they are showing
>>> up are sending heartbeats between each other.
>>> Do you have any idea what could be a problem? and how we could deal with
>>> them or at least try to investigate? Is there any timeout/config that we
>>> could try to enable?
>>>
>>


Re: Flink 1.7.1 job is stuck in running state

2019-01-18 Thread Gary Yao
Hi Piotr,

What was the version you were using before 1.7.1?
How do you deploy your cluster, e.g., YARN, standalone?
Can you attach full TM and JM logs?

Best,
Gary

On Fri, Jan 18, 2019 at 3:05 PM Piotr Szczepanek 
wrote:

> Hello,
> we have scenario with running Data Processing jobs that generates export
> files on demand. Our first approach was using ClusterClient, but recently
> we switched to REST API for job submittion. In the meantime we switched to
> flink 1.7.1 and that started to cause a problems.
> Some of our jobs are stuck, not processing any data. Task Managers have
> info that Chain is switching to RUNNING, and then nothing happenes.
> In TM's stdout logs we can see that for some reason log is cut, e.g.:
>
> Jan 10, 2019 4:28:33 PM INFO:
> org.apache.parquet.hadoop.InternalParquetRecordReader: RecordReader
> initialized will read a total of 615 records.
> Jan 10, 2019 4:28:33 PM INFO:
> org.apache.parquet.hadoop.InternalParquetRecordReader: at row 0. reading
> next block
> Jan 10, 2019 4:28:33 PM INFO:
> org.apache.parquet.hadoop.InternalParquetRecordReader: block read in memory
> in 63 ms. row count = 615
> Jan 10, 2019 4:28:33 PM WARNING:
> org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize counter
> due to context is not a instance of TaskInputOutputContext, but is
> org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
> Jan 10, 2019 4:28:33 PM INFO:
> org.apache.parquet.hadoop.InternalParquetRecordReader: RecordReader
> initialized will read a total of 140 records.
> Jan 10, 2019 4:28:33 PM INFO:
> org.apache.parquet.hadoop.InternalParquetRecordReader: at row 0. reading
> next block
> Jan 10, 2019 4:28:33 PM INFO:
> org.apache.parquet.hadoop.InternalParquetRecordReader: block read in memory
> in 2 ms. row count = 140
> Jan 10, 2019 4:28:33 PM WARNING:
> org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize counter
> due to context is not a instance of TaskInputOutputContext, but is
> org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
> Jan 10, 2019 4:28:33 PM INFO: or
>
> As you can see, last line is cut in the middle, and nothing happenes later
> on.
> None of counters ( records/bytes sent/read) are increased.
> We switched debug on on both TMs and JM but only thing they are showing up
> are sending heartbeats between each other.
> Do you have any idea what could be a problem? and how we could deal with
> them or at least try to investigate? Is there any timeout/config that we
> could try to enable?
>


Re: Get the savepointPath of a particular savepoint

2019-01-14 Thread Gary Yao
Hi,

The API still returns the location of a completed savepoint. See the example
in the Javadoc [1].

Best,
Gary

[1]
https://github.com/apache/flink/blob/1325599153b162fc85679589cab0c2691bf398f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L90

On Mon, Jan 14, 2019 at 2:46 PM Dawid Wysakowicz 
wrote:

> Hi,
>
> The path of a savepoint is a user specified parameter, therefore it is
> not tracked by flink. It is up to the user to know where should the
> savepoint end up.
>
> As for API to check status of a savepoint you can use[1]
>
> Best,
>
> Dawid
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints-triggerid
>
> On 13/01/2019 18:47, anaray wrote:
> > As per the 1.7.0 documentation  here
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jars-jarid-run>
>
> > To start a job from a savepoint, savepointPath is required. But it not
> clear
> > from where to get this savepointPath? In earlier versions we could get it
> > from
> > /jobs/:jobid/cancel-with-savepoint/in-progress/:requestId. It gave
> response
> > like
> > {
> >   "status": "success",
> >   "request-id": 1,
> >   "savepoint-path": ""
> > }
> >
> > Is there a way to get the savepointPath from an API on 1.7?
> >
> > Thanks,
> >
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: Building Flink from source according to vendor-specific versionbut causes protobuf conflict

2019-01-11 Thread Gary Yao
Hi Wei,

Did you build Flink with maven 3.2.5 as recommended in the documentation
[1]?
Also, did you use the -Pvendor-repos flag to add the cloudera repository
when
building?

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html#build-flink
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html#vendor-specific-versions

On Tue, Jan 8, 2019 at 5:17 AM Wei Sun  wrote:

> Hi,Timo
>
> Good day!
>
> Thank you for your help! This issue has been solved with the rebuilt flink
> version.  But I found that does not work with the
> 'Apache Flink 1.7.1 only' version even if i configure the class path like 
> export
> HADOOP_CLASSPATH=`hadoop classpath` . I will check it later.
> Thanks again.
>
> Best Regards
> Wei
>
> -- Original --
> *From: * "Timo Walther";;
> *Date: * Jan 8, 2019
> *To: * "user";
> *Cc: * "gary";
> *Subject: * Re: Building Flink from source according to vendor-specific
> versionbut causes protobuf conflict
>
> Hi Wei,
>
> did you play around with classloading options mentioned here [1]. The -d
> option might impact how classes are loaded when the job is deployed on the
> cluster.
>
> I will loop in Gary that might now more about the YARN behavior.
>
> Regards,
> Timo
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html#user-jars--classpath
>
>
> Am 07.01.19 um 10:33 schrieb Wei Sun:
>
> Hi guys,
>
> Good day.
>
> I rebuilt flink from the source and specified the vendor specific Hadoop
> version. It works well when i just submit a streaming application  without
> '-d'(--detached) option as follows:
> bin/flink run -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 -ytm
> 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter
> ./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf
>
> But if i add the '-d'(--detached) option,  a '
> *org.apache.flink.client.deployment.ClusterDeploymentException*' will be
> thrown out to the CLI. Just as:
> bin/flink run *-d* -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048
> -ytm 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter
> ./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf
>
> *Exception
> start*
>  The program finished with the following exception:
> org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> deploy Yarn job cluster.
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$16(CliFrontend.java:1126)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by:
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
> The YARN application unexpectedly switched to state FAILED during
> deployment.
> Diagnostics from YARN: Application application_1544777537685_0068 failed 2
> times due to AM Container for appattempt_1544777537685_0068_02 exited
> with  exitCode: 1
> For more detailed output, check application tracking page:
> http://103-8-200-sh-100-F07.yidian.com:8088/proxy/application_1544777537685_0068/Then,
> click on links to logs of each attempt.
> Diagnostics: Exception from container-launch.
> Container id: container_e03_1544777537685_0068_02_01
> Exit code: 1
> Stack trace: ExitCodeException exitCode=1:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:543)
> at org.apache.hadoop.util.Shell.run(Shell.java:460)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720)
> at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Container exited with a non-zero exit code 1

Re: [Flink 1.7.0] always got 10s ask timeout exception when submitting job with checkpoint via REST

2019-01-10 Thread Gary Yao
Hi all,

I think increasing the default value of the config option web.timeout [1] is
what you are looking for.

Best,
Gary

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java#L76
[2]
https://github.com/apache/flink/blob/a07ce7f6c88dc7d0c0d2ba55a0ab3f2283bf247c/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java#L177

On Thu, Jan 10, 2019 at 9:19 PM Aaron Levin  wrote:

> We are also experiencing this! Thanks for speaking up! It's relieving to
> know we're not alone :)
>
> We tried adding `akka.ask.timeout: 1 min` to our `flink-conf.yaml`, which
> did not seem to have any effect. I tried adding every other related akka,
> rpc, etc. timeout and still continue to encounter these errors. I believe
> they may also impact our ability to deploy (as we get a timeout when
> submitting the job programmatically). I'd love to see a solution to this if
> one exists!
>
> Best,
>
> Aaron Levin
>
> On Thu, Jan 10, 2019 at 2:58 PM Steven Wu  wrote:
>
>> We are trying out Flink 1.7.0. We always get this exception when
>> submitting a job with external checkpoint via REST. Job parallelism is
>> 1,600. state size is probably in the range of 1-5 TBs. Job is actually
>> started. Just REST api returns this failure.
>>
>> If we submitting the job without external checkpoint, everything works
>> fine.
>>
>> Anyone else see such problem with 1.7? Appreciate your help!
>>
>> Thanks,
>> Steven
>>
>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>> akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
>> Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>> at
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$4(JarRunHandler.java:114)
>> at
>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>> at
>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at
>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
>> at
>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>> at
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
>> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>> at
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>> at
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>> at
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>> at
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>> at
>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>> at
>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>> at
>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.util.concurrent.CompletionException:
>> akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
>> Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>> at
>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
>> at
>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
>> at
>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
>> at
>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
>> ... 21 more
>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
>> Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>> at
>> 

Re: How to shut down Flink Web Dashboard in detached Yarn session?

2018-12-31 Thread Gary Yao
Hi,

You can use the YARN client to list all applications on your YARN cluster:

yarn application -list

If this does not show any running applications, the Flink cluster must have
somehow terminated. If you have YARN's log aggregation enabled, you should
be
able to view the Flink logs by running:

yarn logs -applicationId 

Best,
Gary

On Fri, Dec 28, 2018 at 9:42 PM Sai Inampudi  wrote:

> Hi everyone,
>
> I recently attempted to create a Flink cluster on YARN by executing the
> following:
> ~/flink-1.5.4/bin/yarn-session.sh -n 5 -tm 2048 -s 4 -d -nm flink_yarn
>
> The resulting command was not completely successful but it did end up
> creating a Apache Flink Dashboard with 1 Task Manager, 1 Task Slot, and 1
> Job Manager.
>
> When I look at my Yarn Resource Manager, I don't see my application
> running. CLI calls for the application id also returned nothing.
>
> I would like to kill the existing web dashboard as well as the other
> lingering task manager/job manager so that I can try recreating the yarn
> session successfully.
>
> Has anyone encountered this before and has any suggestion? I looked
> through documentation [1] which says to stop a yarn session, you will want
> to use the YARN utilities (yarn application -kill ) to stop the YARN
> session. However, the application id in my logs is not found in the
> Resource Manager so it seems to already have been killed (due to the
> original yarn session command not properly executing?).
>
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/yarn_setup.html#detached-yarn-session
>


Re: Flink issue while setting up in Kubernetes

2018-12-10 Thread Gary Yao
Hi Abhi Thakur,

We need more information to help you. What docker images are you using? Can
you share the kubernetes resource definitions? Can you share the complete
logs
of the JM and TMs? Did you follow the steps outlined in the Flink
documentation [1]?

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html

On Mon, Dec 10, 2018 at 7:29 AM Thakur, Abhi  wrote:

> We are trying to setup a single node Kubernetes cluster.
>
> 1 Job Manager and 1 Task Manager.
>
> Before we were getting an error, and we followed this thread.
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-5-4-issues-w-TaskManager-connecting-to-ResourceManager-td23298.html
>
> After following the above mentioned archive , we have used the following
> commands to startup the Flink services :
>
>
>
> ${FLINK_HOME}/bin/jobmanager.sh start-foreground
>
> ${FLINK_HOME}/bin/taskmanager.sh start-foreground
>
>
>
> Previously jobmanager was being started as :
>
> ${FLINK_HOME}/bin/jobmanager.sh start-foreground  *cluster*
>
> ${FLINK_HOME}/bin/taskmanager.sh start-foreground
>
>
>
> It removed that error and now we are getting this error as shown below.
>
> We searched all  archives and have a dead end.
>
> We have set up all ports correctly. Flink version used is 1.6.2.
>
> Thanks in advance.
>
>
>
>
>
> 2018-12-08 06:52:38,959 WARN akka.remote.ReliableDeliverySupervisor -
> Association with remote system
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed,
> address is now gated for [50] ms. Reason: [Disassociated]
>
>   2018-12-08 06:52:41,863 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager -
> Registering TaskManager with ResourceID 037d1c33ec0406598f2ce30472f97e65
> (akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122/user/taskmanager_0)
> at ResourceManager
>
>   2018-12-08 06:53:23,619 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - The
> heartbeat of TaskManager with id e0383ee248832f639659082c70a2f4e9 timed
> out.
>
>   2018-12-08 06:53:23,619 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager -
> Closing TaskExecutor connection e0383ee248832f639659082c70a2f4e9 because:
> The heartbeat of TaskManager with id e0383ee248832f639659082c70a2f4e9 timed
> out.
>
>   2018-12-08 06:53:48,961 WARN
> akka.remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed,
> address is now gated for [50] ms. Reason: [Disassociated]
>
>   2018-12-08 06:53:53,615 WARN
> akka.remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed,
> address is now gated for [50] ms. Reason: [Association failed with
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122]] Caused by:
> [flink-taskmanager-6cf55db87b-5x9sd: Name or service not known]
>
>   2018-12-08 06:54:03,601 WARN
> akka.remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed,
> address is now gated for [50] ms. Reason: [Association failed with
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122]] Caused by:
> [flink-taskmanager-6cf55db87b-5x9sd]
>
>   2018-12-08 06:54:13,605 WARN
> akka.remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed,
> address is now gated for [50] ms. Reason: [Association failed with
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122]] Caused by:
> [flink-taskmanager-6cf55db87b-5x9sd: Name or service not known]
>
>   2018-12-08 06:54:23,613 WARN
> akka.remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed,
> address is now gated for [50] ms. Reason: [Association failed with
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122]] Caused by:
> [flink-taskmanager-6cf55db87b-5x9sd: Name or service not known]
>
>   2018-12-08 06:54:33,601 WARN
> akka.remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122] has failed,
> address is now gated for [50] ms. Reason: [Association failed with
> [akka.tcp://flink@flink-taskmanager-6cf55db87b-5x9sd:6122]] Caused by:
> [flink-taskmanager-6cf55db87b-5x9sd]
>
>   2018-12-08 06:54:33,619 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - The
> heartbeat of TaskManager with id 037d1c33ec0406598f2ce30472f97e65 timed
> out.
>
>   2018-12-08 06:54:33,619 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager -
> Closing TaskExecutor connection 037d1c33ec0406598f2ce30472f97e65 because:
> The heartbeat of TaskManager with id 

Re: Per job cluster doesn't shut down after the job is canceled

2018-11-20 Thread Gary Yao
Hi Paul,

Sorry for the late reply. I had a look at the attached log. I think
FLINK-10482 affects the shut down of the "per-job cluster" after all. Here
is
the respective stacktrace:

2018-11-06 10:45:17,405 ERROR
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor  - Caught
exception while executing runnable in main thread.
java.lang.IllegalArgumentException: Negative number of in progress
checkpoints
at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
at
org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.(CheckpointStatsCounts.java:72)
at
org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177)
at
org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553)
at
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340)
at
org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:1247)
at
org.apache.flink.runtime.jobmaster.JobMaster.access$1600(JobMaster.java:147)
at
org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1590)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

We try to create an ArchivedExecutionGraph, which fails because we cannot
snapshot the checkpoint statistics. The subsequent code that should
ultimately
shut down the cluster is not executed [1]. If you can tell us how you run
into
the "Negative number of in progress checkpoints" problem, we might be able
to
come up with a mitigation until FLINK-10482 is fixed.

Best,
Gary

[1]
https://github.com/apache/flink/blob/614f2162e42345da7501f8f6ea724a7e0ce65e3c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1247-L1248

On Wed, Nov 14, 2018 at 9:46 AM Paul Lam  wrote:

> Hi Gary,
>
> Thanks for your reply and sorry for the delay. The attachment is the
> jobmanager logs after invoking the cancel command.
>
> I think it might be related to the custom source, because the jobmanager
> keeps trying to trigger a checkpoint for it,
> but in fact it’s already canceled. The source implementation is using a
> running flag to denote it’s running, and the
> cancel method is simply setting the flag to false, which I think is a
> common way of implementing a custom source.
> In addition, the cluster finally shut down because I killed it with yarn
> commands.
>
> And also thank you for the pointer, I’ll keep tracking this problem.
>
> Best,
> Paul Lam
>
>
> 在 2018年11月10日,02:10,Gary Yao  写道:
>
> Hi Paul,
>
> Can you share the complete logs, or at least the logs after invoking the
> cancel command?
>
> If you want to debug it yourself, check if
> MiniDispatcher#jobReachedGloballyTerminalState [1] is invoked, and see how
> the
> jobTerminationFuture is used.
>
> Best,
> Gary
>
> [1]
> https://github.com/apache/flink/blob/091cff3299aed4bb143619324f6ec8165348d3ae/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java#L141
>
>
> On Wed, Nov 7, 2018 at 3:27 AM Paul Lam  wrote:
>
>> Hi,
>>
>> I’m using Flink 1.5.3, and I’ve seen several times that the detached YARN
>> cluster doesn’t shut down after the job is canceled successfully. The only
>> errors I found in jobmanager’s log are as below (the second one appears
>> multiple times):
>>
>> ```
>>
>> 2018-11

Re: Any examples on invoke the Flink REST API post method ?

2018-11-12 Thread Gary Yao
Hi Henry,

What you see in the API documentation is a schema definition and not a
sample
request. The request body should be:

{
"target-directory": "hdfs:///flinkDsl",
"cancel-job": false
}

Let me know if that helps.

Best,
Gary

On Mon, Nov 12, 2018 at 7:15 AM vino yang  wrote:

> Hi Henry,
>
> Maybe Gary can help you, ping him for you.
>
> Thanks, vino.
>
> 徐涛  于2018年11月12日周一 下午12:45写道:
>
>> HI Experts,
>> I am trying to trigger a savepoint from Flink REST API on version 1.6 ,
>> in the document it shows that I need to pass a json as a request body
>> {
>>  "type" : "object”,
>>   "id" :
>> "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:savepoints:SavepointTriggerRequestBody”,
>>  "properties" : {
>>  "target-directory" : { "type" : "string" },
>>  "cancel-job" : { "type" : "boolean" }
>>  }
>> }
>> So I send the following json as
>> {
>> "type":"object”,
>>
>> "id":"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:savepoints:SavepointTriggerRequestBody”,
>> "properties”:{
>> "target-directory":"hdfs:///flinkDsl”,
>> "cancel-job”:false
>> }
>> }
>>
>> And I use okhttp to send the request:
>> val MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8")
>> val body = RequestBody.create(MEDIA_TYPE_JSON, postBody)
>> val request = new Request.Builder()
>>   .url(url)
>>   .post(body)
>>   .build()
>> client.newCall(request).execute()
>>
>>
>> but get an error  {"errors":["Request did not match expected format
>> SavepointTriggerRequestBody.”]}
>> Would anyone give an example of how to invoke the post rest api of Flink?
>> Thanks a lot.
>>
>> Best
>> Henry
>>
>


Re: Flink Web UI does not show specific exception messages when job submission fails

2018-11-09 Thread Gary Yao
Hi,

We only propagate the exception message but not the complete stacktrace [1].
Can you create a ticket for that?

Best,
Gary

[1]
https://github.com/apache/flink/blob/091cff3299aed4bb143619324f6ec8165348d3ae/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java#L93

On Tue, Nov 6, 2018 at 6:50 PM Luis Gustavo Oliveira Silva <
l...@poli.ufrj.br> wrote:

> Hello,
>
> I was using Flink 1.4.2 and when submiting jobs through the Web UI, I
> could see exceptions that would help me debug jobs, such as:
>
> We're sorry, something went wrong. The server responded with:
>>
>> java.util.concurrent.CompletionException: 
>> org.apache.flink.util.FlinkException: Could not run the jar.
>>  at 
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
>>  at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
>> Source)
>>  at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>>  at java.util.concurrent.FutureTask.run(Unknown Source)
>>  at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown
>>  Source)
>>  at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>>  Source)
>>  at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>>  at java.lang.Thread.run(Unknown Source)
>> Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
>>  ... 9 more
>> Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
>> main method caused an error.
>>  at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
>>  at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>>  at 
>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>  at 
>> org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
>>  at 
>> org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
>>  at 
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
>>  ... 8 more
>> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
>> Encountered "." at line 3, column 4.
>> Was expecting one of:
>> 
>> "ORDER" ...
>> "LIMIT" ...
>> "OFFSET" ...
>> "FETCH" ...
>> "FROM" ...
>> "," ...
>> "UNION" ...
>> "INTERSECT" ...
>> "EXCEPT" ...
>> "MINUS" ...
>>
>>  at 
>> org.apache.flink.table.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:81)
>>  at 
>> org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:558)
>>  at 
>> com.stone.default.rule.Sandbox3$.delayedEndpoint$com$stone$default$rule$Sandbox3$1(Sandbox.scala:112)
>>  at 
>> com.stone.default.rule.Sandbox3$delayedInit$body.apply(Sandbox.scala:93)
>>  at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>>  at 
>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>>  at scala.App$$anonfun$main$1.apply(App.scala:76)
>>  at scala.App$$anonfun$main$1.apply(App.scala:76)
>>  at scala.collection.immutable.List.foreach(List.scala:392)
>>  at 
>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>>  at scala.App$class.main(App.scala:76)
>>  at com.stone.default.rule.Sandbox3$.main(Sandbox.scala:93)
>>  at com.stone.default.rule.Sandbox3.main(Sandbox.scala)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>>  at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>  at java.lang.reflect.Method.invoke(Unknown Source)
>>  at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
>>  ... 13 more
>> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." 
>> at line 3, column 4.
>> Was expecting one of:
>> 
>> "ORDER" ...
>> "LIMIT" ...
>> "OFFSET" ...
>> "FETCH" ...
>> "FROM" ...
>> "," ...
>> "UNION" ...
>> "INTERSECT" ...
>> "EXCEPT" ...
>> "MINUS" ...
>>
>>  at 
>> org.apache.calcite.sql.parser.impl.SqlParserImpl.convertException(SqlParserImpl.java:350)
>>  at 
>> org.apache.calcite.sql.parser.impl.SqlParserImpl.normalizeException(SqlParserImpl.java:131)
>>  at 
>> org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:138)
>>  at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:163)
>>  at 
>> org.apache.flink.table.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:77)
>>   

Re: Per job cluster doesn't shut down after the job is canceled

2018-11-09 Thread Gary Yao
Hi Paul,

Can you share the complete logs, or at least the logs after invoking the
cancel command?

If you want to debug it yourself, check if
MiniDispatcher#jobReachedGloballyTerminalState [1] is invoked, and see how
the
jobTerminationFuture is used.

Best,
Gary

[1]
https://github.com/apache/flink/blob/091cff3299aed4bb143619324f6ec8165348d3ae/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java#L141


On Wed, Nov 7, 2018 at 3:27 AM Paul Lam  wrote:

> Hi,
>
> I’m using Flink 1.5.3, and I’ve seen several times that the detached YARN
> cluster doesn’t shut down after the job is canceled successfully. The only
> errors I found in jobmanager’s log are as below (the second one appears
> multiple times):
>
> ```
>
> 2018-11-07 09:48:38,663 WARN  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Error while 
> notifying JobStatusListener
> java.lang.IllegalStateException: Incremented the completed number of 
> checkpoints without incrementing the in progress checkpoints before.
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.incrementFailedCheckpoints(CheckpointStatsCounts.java:165)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.reportFailedCheckpoint(CheckpointStatsTracker.java:270)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.access$100(CheckpointStatsTracker.java:55)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$PendingCheckpointStatsCallback.reportFailedCheckpoint(CheckpointStatsTracker.java:314)
>   at 
> org.apache.flink.runtime.checkpoint.PendingCheckpointStats.reportFailedCheckpoint(PendingCheckpointStats.java:184)
>   at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.reportFailedCheckpoint(PendingCheckpoint.java:517)
>   at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:454)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1200)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1713)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1370)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1354)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.cancel(ExecutionGraph.java:1000)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.cancel(JobMaster.java:389)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> 2018-11-07 09:54:52,420 ERROR 
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - 
> Implementation error: Unhandled exception.
> java.lang.IllegalArgumentException: Negative number of in progress checkpoints
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.(CheckpointStatsCounts.java:72)
>   at 
> 

Re: Never gets into ProcessWindowFunction.process()

2018-11-09 Thread Gary Yao
Hi,

You are using event time but are you assigning watermarks [1]? I do not see
it
in the code.

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records

On Fri, Nov 9, 2018 at 6:58 PM Vijay Balakrishnan 
wrote:

> Hi,
> Any help is appreciated.Dug into this. *I can see the deserialized output
> log from FlinkKinesisConsumer deserialization but it keeps looping to pull
> from Kinesis Stream but never gets into the Windowing operation for
> process() or apply().*
>
> FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream
> and the deserialized output never seems to get into the apply() or
> process() method of a Windowing operation. I can see the logs of
> MonitoringMapKinesisSchema deserializing data back successfully from
> Kinesis and converting into a POJO.
>
> Code:
>
> *//Create environment*:
> StreamExecutionEnvironment env;
> if (local) {
> Configuration configuration = new Configuration();
> configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
> env = StreamExecutionEnvironment.createLocalEnvironment(1,
> configuration);
> } else {
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> }
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> *//create FlinkKinesisConsumer*
> Properties kinesisConsumerConfig = new Properties();
> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
> "AUTO");
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
> "1");
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
> "2000");
> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> "TRIM_HORIZON");
> FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>(
> kinesisTopicRead, new MonitoringMapKinesisSchema(),
> kinesisConsumerConfig);*//deserialization works fine*
> DataStream kinesisStream = env
> .addSource(kinesisConsumer);
> KeyedStream>
> enrichedComponentInstanceStream1Key = kinesisStream
> .keyBy(new KeySelector String>>() {
> public Tuple3
> getKey(Monitoring mon) throws Exception {
> return new Tuple3 String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
> }
> });
>
> WindowedStream, TimeWindow>
> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key
>
> .timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));
>
> DataStream enrichedComponentInstanceStream1 =
> enrichedComponentInstanceStream1Win
> //.process(new Window5SecProcessing(gameId, FIVE_SECONDS,
> COMPONENT_INSTANCE_OPERATION))
> .process(new Window5SecProcessing());*//never gets in
> here*
> //Gets into Window5SecProcessing.open() method during initialization but
> never into the process method 
> private static class Window5SecProcessing extends
> ProcessWindowFunction String, String>, TimeWindow> {
>
> private transient String interval;
> private transient String gameId;
> private transient String keyType;
> private transient org.apache.flink.metrics.Histogram
> fiveSecHistogram;
>
> private transient ValueState total5SecCountState;
> private transient ValueStateDescriptor
> total5SecCountValueStateDescriptor;
> public Window5SecProcessing() {
>
> }
>
> public Window5SecProcessing(String gameId, String interval, String
> keyType) {
> this.gameId = gameId;
> this.interval = interval;
> this.keyType = keyType;
> }
>
> @Override
> public void clear(Context context) throws Exception {
> super.clear(context);
> KeyedStateStore keyedStateStore = context.windowState();
>
> keyedStateStore.getState(total5SecCountValueStateDescriptor).clear();
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> logger.debug("Gets in here fine -Window5SecProcessing -Entered
> open - parameters:{}", parameters);
> com.codahale.metrics.Histogram fiveSecHist =
> new com.codahale.metrics.Histogram(new
> SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
> this.fiveSecHistogram = new
> DropwizardHistogramWrapper(fiveSecHist);
> total5SecCountValueStateDescriptor =
> new ValueStateDescriptor("total5SecCount",
> Long.class, 0L);
> total5SecCountState =
> getRuntimeContext().getState(total5SecCountValueStateDescriptor);
> }
>
>
> public void process(Tuple3 currentKey1,
> Context ctx, Iterable input, Collector out)
> throws Exception {
> 

Re: Never gets into ProcessWindowFunction.process()

2018-11-09 Thread Gary Yao
Hi,

If the job is actually running and consuming from Kinesis, the log you
posted
is unrelated to your problem. To understand why the process function is not
invoked, we would need to see more of your code, or you would need to
provide
an executable example. The log only shows that all offered slots are
occupied
by tasks of your job.

Best,
Gary

On Tue, Nov 6, 2018 at 1:10 AM Vijay Balakrishnan 
wrote:

> Hi,
> Running in IntelliJ IDE on a Mac with 4 vProcessors.
> Code compiles fine. It never gets into the Window5SecProcessing's
> process().I am able to get data from the Kinesis Consumer and it is
> deserialized properly when I debug the code. It gets into the
> Window5SecProcessing.open() method for initialization.
>
> Not sure if I am failing with no slots available ???
> In main():
>  //trimmed a lot of code
> *FlinkKinesisConsumer kinesisConsumer =
> getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ...,
> ...);*
>
> *DataStream kinesisStream = env*
> *.addSource(kinesisConsumer)*
> *.uid(jobName + "KinesisSource");*
> *KeyedStream>
> enrichedComponentInstanceStream1Key = kinesisStream*
> *.keyBy(new KeySelector String>>() {*
> *public Tuple3
> getKey(Monitoring mon) throws Exception {*
> *return new Tuple3 String>(mon.getComponent(), mon.getInstance(), mon.getOperation());*
> *}});*
>
> *WindowedStream,
> TimeWindow> enrichedComponentInstanceStream1Win =
> enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));*
>
> *DataStream enrichedComponentInstanceStream1 =
> enrichedComponentInstanceStream1Win*
> *.process(new Window5SecProcessing(gameId, FIVE_SECONDS,
> COMPONENT_INSTANCE_OPERATION))*
> *.uid("Component Instance Operation Key Monitoring " +
> FIVE_SECONDS);*
> *enrichedComponentInstanceStream1.addSink(new
> SinkFunction() {*
> *@Override*
> *public void invoke(MonitoringGrouping mg, Context context)
> throws Exception {*
> *//TODO call ES*
> *logger.debug("In enrichedComponentInstanceStream1 Sink
> received mg:{}", mg);*
> *}*
> *});*
> *Window processing class*:
> private static class Window5SecProcessing extends
> ProcessWindowFunction String, String>, TimeWindow> {
> private transient Histogram fiveSecHist;
> private transient Histogram fiveMinHist;
> private transient org.apache.flink.metrics.Histogram
> fiveSecHistogram;
> private transient org.apache.flink.metrics.Histogram
> fiveMinHistogram;
> private transient ValueState total5SecCountState;
> private transient ValueStateDescriptor
> total5SecCountValueStateDescriptor;
>
> public Window5SecProcessing(String gameId, String interval, String
> keyType) {
> ...
> }
>
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> logger.debug("Window5SecProcessing -Entered open -
> parameters:{}", parameters);//gets here
> com.codahale.metrics.Histogram fiveSecHist =
> new com.codahale.metrics.Histogram(new
> SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
> this.fiveSecHistogram = new
> DropwizardHistogramWrapper(fiveSecHist);
> total5SecCountValueStateDescriptor =
> new ValueStateDescriptor("total5SecCount",
> Long.class, 0L);
> total5SecCountState =
> getRuntimeContext().getState(total5SecCountValueStateDescriptor);
> }
> ..
>
>* public void process(Tuple3 currentKey1,
> Context ctx, Iterable input, Collector out)
> throws Exception {*
> *logger.debug("Window5SecProcessing - Entered process
> ");//never gets here*
> *Tuple3 currentKey = (Tuple3 String, String>) currentKey1;*
> **
> *}*
>
> }
> At 1 point in the logs, I seem to see that there are no slots available
> ? Is that the problem- how can I fix that if that is the case to test
> locally on my Mac ??
> *Log:*
> flink-akka.actor.default-dispatcher-71 DEBUG
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Slot Pool
> Status:
> status: connected to
> akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
> registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed]
> *available slots: []*
> allocated slots: [[AllocatedSlot
> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]]
> pending requests: []
> sharing groups: {
>  5a0ae59368145d715b3cc0d39ba6c05a 
> {
> groupId=5a0ae59368145d715b3cc0d39ba6c05a
> unresolved={}
> resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost
> 

Re: User jar is present in the flink job manager's class path

2018-10-11 Thread Gary Yao
Hi,

Could it be that you are submitting the job in attached mode, i.e., without
-d
parameter? In the "job cluster attached mode", we actually start a Flink
session cluster (and stop it again from the CLI) [1]. Therefore, in attached
mode, the config option "yarn.per-job-cluster.include-user-jar" is
effectively
ignored. If you submit with -d, a "true job cluster" is started, and the
user
jar should be added to the system classpath. Alternatively, if the detached
mode is not an option for you, you could add a jar with your custom logger
implementation to the flink /lib directory.

If the behavior in Flink 1.3 is indeed different, then I would consider
this a
regression. Can you open a jira issue for that?

Best,
Gary

[1]
https://github.com/apache/flink/blob/d13015e23c805e1aaefdc7d8037f2fa87ea74830/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L261

On Thu, Oct 11, 2018 at 3:54 PM Timo Walther  wrote:

> Yes, you are right. I was not aware that the resolution order depends on
> the cluster deployment. I will loop in Gary (in CC) that might know
> about such a YARN setup.
>
> Regards,
> Timo
>
> Am 11.10.18 um 15:47 schrieb yinhua.dai:
> > Hi Timo,
> >
> > I didn't tried to configure the classloader order, according to the
> > document, it should only be needed for yarn-session mode, right?
> >
> > I can see the ship files(-yt /path/dir/) is present in job manager's
> class
> > path, so maybe I should put my uber jar in the -yt path so that it will
> be
> > shipped and add to class path in flink 1.5?
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>


Re: Flink 1.5.2 - excessive ammount of container requests, Received new/Returning excess container "flood"

2018-10-10 Thread Gary Yao
Hi Borys,

I remember that another user reported a similar issue recently [1] –
attached
to the ticket you can find his log file. If I recall correctly, we concluded
that YARN returned the containers very quickly. At the time, Flink's debug
level logs were inconclusive because we did not log the reason why the
container was returned, and the user could not provide us the YARN logs. In
1.5.4, we improved the logging [2]. Hence, it would be good if you can
reproduce this with debug level logging using Flink 1.5.4.

You could also try the vanilla Hadoop distribution, or Flink 1.6.

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-10104
[2] https://issues.apache.org/jira/browse/FLINK-10137

On Tue, Oct 9, 2018 at 5:55 PM Borys Gogulski 
wrote:

> Hey guys,
>
> thanks for the replies.
> 1. "Requesting new TaskExecutor" looks fine as it's exactly 32 as is jobs'
> parallelism set.
> The weird thing is that after those 32 containers requested and received we
> have this "flood" of 'Received new container/Returning excess container`
> (and as shown below it's actually doing something on YARN side)
> Where does those come from?
> 2. I felt that DEBUG will be needed, we'll see what we can do about it.
> 3. Yes, all in favor for upgrading to 1.5.4. But as Gary mentioned there
> seems to be no fixes that could heal it (I was reading release notes
> previous to posting this thread ; )).
> 4. Hadoop: 2.6.0+cdh5.14.0
>
> Here are logs for one of "excess" containers:
> 1. Flink JM
> 2018-10-09 17:35:33,493 INFO  org.apache.flink.yarn.YarnResourceManager
>
> - Received new container: container_e96_1538374332137_3071_01_2485560 -
> Remaining pending container requests: 0
> 2018-10-09 17:35:33,493 INFO  org.apache.flink.yarn.YarnResourceManager
>
> - Returning excess container container_e96_1538374332137_3071_01_2485560.
> 2. YARN
> 2018-10-09 17:35:33,283 INFO
> org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl:
> container_e96_1538374332137_3071_01_2485560 Container Transitioned from NEW
> to ALLOCATED
> 2018-10-09 17:35:33,283 INFO
> org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=user
>
> OPERATION=AM Allocated ContainerTARGET=SchedulerApp
> RESULT=SUCCESS  APPID=application_1538374332137_3071
> CONTAINERID=container_e96_1538374332137_3071_01_2485560
> 2018-10-09 17:35:33,283 INFO
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode:
> Assigned container container_e96_1538374332137_3071_01_2485560 of capacity
>  on host server:44142, which has 5 containers,
>  used and  available after
> allocation
> 2018-10-09 17:35:33,283 INFO
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue:
> assignedContainer application attempt=appattempt_1538374332137_3071_01
> container=Container: [ContainerId:
> container_e96_1538374332137_3071_01_2485560, NodeId: server:44142,
> NodeHttpAddress: server:8042, Resource: , Priority:
> 0, Token: null, ] queue=queue: capacity=0.5, absoluteCapacity=0.5,
> usedResources=, usedCapacity=1.9947916,
> absoluteUsedCapacity=0.9973958, numApps=2, numContainers=383
> clusterResource=
> 2018-10-09 17:35:33,485 INFO
> org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl:
> container_e96_1538374332137_3071_01_2485560 Container Transitioned from
> ALLOCATED to ACQUIRED
> 2018-10-09 17:35:38,532 INFO
> org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl:
> container_e96_1538374332137_3071_01_2485560 Container Transitioned from
> ACQUIRED to RELEASED
> 2018-10-09 17:35:38,532 INFO
>
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp:
> Completed container: container_e96_1538374332137_3071_01_2485560 in state:
> RELEASED event:RELEASED
> 2018-10-09 17:35:38,532 INFO
> org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=user
>
> IP=ip  OPERATION=AM Released Container TARGET=SchedulerApp
> RESULT=SUCCESS  APPID=application_1538374332137_3071
> CONTAINERID=container_e96_1538374332137_3071_01_2485560
> 2018-10-09 17:35:38,532 INFO
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode:
> Released container container_e96_1538374332137_3071_01_2485560 of capacity
>  on host server:44142, which currently has 0
> containers,  used and 
> available, release resources=true
> 2018-10-09 17:35:38,532 INFO
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue:
> completedContainer container=Container: [ContainerId:
> container_e96_1538374332137_3071_01_2485560, NodeId: server:44142,
> NodeHttpAddress: server:8042, Resource: , Priority:
> 0, Token: Token { kind: ContainerToken, service: ip:44142 }, ] queue=queue:
> capacity=0.5, absoluteCapacity=0.5, usedResources= vCores:96>, usedCapacity=0.5, absoluteUsedCapacity=0.25, numApps=2,
> numContainers=96 cluster=
> 2018-10-09 17:35:38,532 INFO
>
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler:
> 

Re: Job manager logs for previous YARN attempts

2018-10-10 Thread Gary Yao
Hi Pawel,

As far as I know, the application attempt is incremented if the application
master fails and a new one is brought up. Therefore, what you are seeing
should not happen. I have just deployed on AWS EMR 5.17.0 (Hadoop 2.8.4) and
killed the container running the application master – the container id was
not
reused. Can you describe how to reproduce this behavior? Do you have a
sample
application? Can you observe this behavior consistently? Can you share the
complete output of

yarn logs -applicationId ?

The call to the method setKeepContainersAcrossApplicationAttempts is needed
to
enable recovery of previously allocated TaskManager containers [1]. I
currently do not see how it is possible to keep the AM container across
application attempts.

> The second challenge is understanding if the job will be restored into new
> application attempts or new application attempt will just have flink
running
> without any job?

The job will be restored if you have HA enabled [2][3].

Best,
Gary

[1]
https://hortonworks.com/blog/apache-hadoop-yarn-hdp-2-2-fault-tolerance-features-long-running-services/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/jobmanager_high_availability.html#yarn-cluster-high-availability
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/yarn_setup.html#recovery-behavior-of-flink-on-yarn

On Mon, Oct 8, 2018 at 12:32 PM Pawel Bartoszek 
wrote:

> Hi,
>
> I am looking into the cause YARN starts new application attempt on Flink
> 1.5.2. The challenge is getting the logs for the first attempt. After
> checking YARN I discovered that in the first attempt and the second one
> application manager (job manager) gets assigned the same container id (is
> this expected ?)  In this case logs from the first attempt are overwritten?
> I found that *setKeepContainersAcrossApplicationAttempts* is enabled here
> here
> 
>
> The second challenge is understanding if the job will be restored into new
> application attempts or new application attempt will just have flink
> running without any job?
>
>
> Regards,
> Pawel
>
> *First attempt:*
>
> pawel_bartoszek@ip-10-4-X-X ~]$ yarn container -list
> appattempt_1538570922803_0020_01
> 18/10/08 10:16:16 INFO client.RMProxy: Connecting to ResourceManager at
> ip-10-4-X-X.eu-west-1.compute.internal/10.4.108.26:8032
> Total number of containers :1
>   Container-Id   Start Time  Finish Time
>  State HostNode Http Address
> LOG-URL
> container_1538570922803_0020_02_01 Mon Oct 08 09:47:17 + 2018
>  N/A  RUNNING
> ip-10-4-X-X.eu-west-1.compute.internal:8041
> http://ip-10-4-X-X.eu-west-1.compute.internal:8042
> http://ip-10-4-X-X.eu-west-1.compute.internal:8042/node/containerlogs/container_1538570922803_0020_02_01/pawel_bartoszek
>
> *Second attempt:*
> [pawel_bartoszek@ip-10-4-X-X ~]$ yarn container -list
> appattempt_1538570922803_0020_02
> 18/10/08 10:16:37 INFO client.RMProxy: Connecting to ResourceManager at
> ip-10-4-X-X.eu-west-1.compute.internal/10.4.X.X:8032
> Total number of containers :1
>   Container-Id   Start Time  Finish Time
>  State HostNode Http Address
> LOG-URL
> container_1538570922803_0020_02_01 Mon Oct 08 09:47:17 + 2018
>  N/A  RUNNING
> ip-10-4-X-X.eu-west-1.compute.internal:8041
> http://ip-10-4-X-X.eu-west-1.compute.internal:8042
> http://ip-10-4-X-X.eu-west-1.compute.internal:8042/node/containerlogs/container_1538570922803_0020_02_01/pawel_bartoszek
>


Re: Utilising EMR's master node

2018-10-06 Thread Gary Yao
Hi Averell,

It is up to the YARN scheduler on which hosts the containers are started.

What Flink version are you using? I assume you are using 1.4 or earlier
because you are specifying a fixed number of TMs. If you launch Flink with
-yn
2, you should be only seeing 2 TMs in total (not 4). Are you starting two
clusters?

Beginning with Flink 1.5, -yn is obsolete because resources are acquired
dynamically, and it is not well-defined in what order TM slots are exhausted
[1].

Best,
Gary

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-1-5-job-distribution-over-cluster-nodes-td23364.html

On Wed, Sep 26, 2018 at 9:25 AM Averell  wrote:

> Thank you Gary.
> Regarding your previous suggestion to to change the configuration regarding
> to the number of vcores on the EMR master node, I tried and found one
> funny/bad behaviour as following:
>  * hardware onfiguration: master node: 4vcores + 8GB ram, 2x executors with
> 16vcores + 32GB ram each.
>  * Flink launch parameters: -yn 2 -ys 16 -ytm 4g...
> 4 TMs were created, with 2 of them were used (0 free slots) and two others
> not used (16 free slots). The bad thing is most of the time 2 free TMs are
> on a same machine, and two occupied ones are on the other machine.
> If I dont change the Hadoop configurations then still 4 TMs created, but
> the
> occupied ones are always on two different servers.
>
> I'm not sure whether that's EMR's issue, or YARN's or Flink's.
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink 1.5.2 - excessive ammount of container requests, Received new/Returning excess container "flood"

2018-10-06 Thread Gary Yao
Hi Borys,

To debug how many containers Flink is requesting, you can look out for the
log
statement below [1]:

Requesting new TaskExecutor container with resources [...]

If you need help debugging, can you attach the full JM logs (preferably on
DEBUG level)? Would it be possible for you to test against 1.5.3 and 1.5.4?
However, I am not aware of any related issues that were fixed for 1.5.3 or
1.5.4. What is the Hadoop distribution that you are using?

Best,
Gary

[1]
https://github.com/apache/flink/blob/release-1.5.2/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java#L454

On Wed, Oct 3, 2018 at 11:36 AM Borys Gogulski 
wrote:

> Hey,
>
>
>
> We’re running Flink 1.5.2 (I know there’s 1.5.4 and 1.6.1) on YARN for
> some jobs we’re processing. It’s a “long running” container to which we’re
> submitting jobs – all jobs submitted to that container have got parallelism
> of 32 (to be precise: in this job there are 8 subtasks with parallelism 32
> and one subtask with parallelism 1), we’re running max 8 of them. TMs are
> set to have one slot only and 6GB RAM each.
> On the beginning, when using Flink 1.5.0 and 1.5.1 with the “on-demand”
> resources policy we were noticing that more containers than it’s required
> are spawned but with Flink 1.5.2 it “stabilized” – there were obviously
> some containers kept for some time after job finished (and no additional
> job was submitted to take those resources) but overhead wasn’t big so we
> were “all good”.
> And here’s the plot twist.
> For couple days now we’re witnessing situations in which spawning one job
> makes Flink request couple hundreds of TMs. Additionally in JM’s logs we
> can find dozens of lines like:
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e96_1538374332137_0793_01_594295 - Remaining
> pending container requests: 0
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e96_1538374332137_0793_01_594295.
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e96_1538374332137_0793_01_594300 - Remaining
> pending container requests: 0
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e96_1538374332137_0793_01_594300.
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e96_1538374332137_0793_01_594303 - Remaining
> pending container requests: 0
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e96_1538374332137_0793_01_594303.
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e96_1538374332137_0793_01_594304 - Remaining
> pending container requests: 0
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e96_1538374332137_0793_01_594304.
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e96_1538374332137_0793_01_594334 - Remaining
> pending container requests: 0
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e96_1538374332137_0793_01_594334.
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e96_1538374332137_0793_01_594337 - Remaining
> pending container requests: 0
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e96_1538374332137_0793_01_594337.
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e96_1538374332137_0793_01_594152 - Remaining
> pending container requests: 0
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e96_1538374332137_0793_01_594152.
>
> 2018-10-03 11:08:27,186 INFO
> org.apache.flink.yarn.YarnResourceManager - Received
> new container: container_e96_1538374332137_0793_01_594410 - Remaining
> pending container requests: 0
>
> 2018-10-03 11:08:27,187 INFO
> org.apache.flink.yarn.YarnResourceManager - Returning
> excess container container_e96_1538374332137_0793_01_594410.
>
> Only change made last week seems to be adding 5 new nodes to YARN Cluster.
> Any ideas why it’s requesting so many containers? Any ideas why there’s
> this “Received/Returning” flood? Right now one job was started 

Re: Utilising EMR's master node

2018-09-26 Thread Gary Yao
Hi Averell,

There is no general answer to your question. If you are running more TMs,
you
get better isolation between different Flink jobs because one TM is backed
by
one JVM [1]. However, every TMs brings additional overhead (heartbeating,
running more threads, etc.) [1]. It also depends on the maximum heap memory
requirements of your operators. Data skew and non-parallel operators can
cause uneven memory requirements.

For capacity planning, you can also take a look at Robert Metzger's slides
[2].

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html#task-slots-and-resources
[2]
https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-robert-metzger-keep-it-going-how-to-reliably-and-efficiently-operate-apache-flink

On Thu, Sep 20, 2018 at 1:13 AM Averell  wrote:

> Hi Gary,
> Thanks for your help.
>
> Regarding TM configurations, in term of performance, when my 2 servers have
> 16 vcores each, should I have 2 TMs with 16GB mem, 16 task slots each, or 8
> TMs with 4GB mem and 4 task slots each?
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: "405 HTTP method POST is not supported by this URL" is returned when POST a REST url on Flink on yarn

2018-09-26 Thread Gary Yao
Hi Henry,

The URL below looks like the one from the YARN proxy (note that "proxy"
appears in the URL):


http://storage4.test.lan:8089/proxy/application_1532081306755_0124/jobs/269608367e0f548f30d98aa4efa2211e/savepoints

You can use

yarn application -status 

to find the host and port of the application master (AM host & RPC Port).

Best,
Gary

On Wed, Sep 26, 2018 at 3:23 AM 徐涛  wrote:

> Hi Till,
> Actually I do send to request to the application master:
> "
> http://storage4.test.lan:8089/proxy/application_1532081306755_0124/jobs/269608367e0f548f30d98aa4efa2211e/savepoints
> ”
> with post body
> { "target-directory" : "hdfs://flinkDsl/xxx", "cancel-job" : "true” }
> If I use the following GET url, everything is OK
> "
> http://storage4.test.lan:8089/proxy/application_1532081306755_0124/jobs/269608367e0f548f30d98aa4efa2211e/checkpoints
> "
>
> Best
> Henry
>
>
> 在 2018年9月26日,上午5:32,Till Rohrmann  写道:
>
> Hi Henry,
>
> I think when running Flink on Yarn, then you must not go through the Yarn
> proxy. Instead you should directly send the post request to the node on
> which the application master runs. When starting a Flink Yarn session via
> yarn-session.sh, then the web interface URL is printed to stdout, for
> example.
>
> Cheers,
> Till
>
> On Tue, Sep 25, 2018 at 9:43 AM 徐涛  wrote:
>
>> Hi All,
>> I am trying to POST a RESTful url and want to generate a savepoint, the
>> Flink version is 1.6.0.
>> When I executed the POST in local, everything is OK, but when I POST the
>> url on a Flink on YARN application. The following error is returned:
>> “405 HTTP method POST is not supported by this URL”, I guess it is caused
>> by YARN limitation. (
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/REST-API-quot-broken-quot-on-YARN-because-POST-is-not-allowed-via-YARN-proxy-td19329.html
>> )
>> But does it have a workaround now?
>>
>> Best
>> Henry
>>
>
>


Re: Utilising EMR's master node

2018-09-18 Thread Gary Yao
Hi Averell,

Flink compares the number of user selected vcores to the vcores configured
in
the yarn-site.xml of the submitting node, i.e., in your case the master
node.
If there are not enough configured vcores, the client throws an exception.
This behavior is not ideal and I found an old JIRA ticket for it [1]. We
could
either remove this check, or – as the original ticket suggests – reuse the
logic from "yarn-session.sh -q" to determine if there is enough capacity in
the cluster.

As a workaround, you can set in the yarn-site.xml

yarn.nodemanager.resource.cpu-vcores

to 16, or alternatively run multiple smaller TaskManagers on each node [2].

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-5542
[2]
https://github.com/apache/flink/blob/09abba37c7d760236c2ba002fa4a3aac11c2641b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L288

On Tue, Sep 18, 2018 at 4:43 AM, Averell  wrote:

> Thank you Gary.
>
> Regarding the option to use a smaller server for the master node, when
> starting a flink job, I would get an error like the following;
>
> /Caused by: org.apache.flink.configuration.IllegalConfigurationException:
> *The number of virtual cores per node were configured with 16 but Yarn only
> has 4 virtual cores available*. Please note that the number of virtual
> cores
> is set to the number of task slots by default unless configured in the
> Flink
> config with 'yarn.containers.vcores.'/
>
> To get around that error, I need to start the job from one of the core
> node.
> Should that be an expected behaviour?
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Utilising EMR's master node

2018-09-17 Thread Gary Yao
Hi Averell,

According to the AWS documentation [1], the master node only runs the YARN
ResourceManager and the HDFS NameNode. Containers can only by launched on
nodes that are running the YARN NodeManager [2]. Therefore, if you want TMs
or
JMs to be launched on your EMR master node, you have to start the
NodeManager
process there but I do not know how well this is supported by AWS EMR.

You can choose a smaller server for the master node but keep in mind that
it is
running the HDFS NameNode as well. The hardware requirements will therefore
partially depend on the HDFS workload.

Best,
Gary

[1]
https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-instances.html
[2]
https://hadoop.apache.org/docs/r2.8.0/hadoop-yarn/hadoop-yarn-site/NodeManager.html

On Mon, Sep 17, 2018 at 5:22 AM, Averell  wrote:

> Hello everyone,
>
> I'm trying to run Flink on AWS EMR following the guides from  Flink doc
>  master/ops/deployment/yarn_setup.html#run-a-single-flink-
> job-on-hadoop-yarn>
> and from  AWS
> 
>
> , and it looks like the EMR master is never used, neither for JM nor TM.
> "bin/yarn-session.sh -q" only shows the core nodes. We are only running
> Flink on that EMR, so it is wasting of resources.
>
> So, is there any way to use the master node for the job, at least for the
> JM
> only?
>
> If that is not possible, should I have different hardware configurations
> between the master node and core nodes (smaller server for the master)?
>
> Thanks and best regards,
> Averell
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Create a file in parquet format

2018-09-11 Thread Gary Yao
Hi Jose,

You can find an example here:


https://github.com/apache/flink/blob/1a94c2094b8045a717a92e232f9891b23120e0f2/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java#L58

Best,
Gary

On Tue, Sep 11, 2018 at 11:59 AM, jose farfan  wrote:

> Hi
>
> I am working in a task. The purpose is to create a sink in Parquet format.
> Then, I am using the "Streaming Flink Sink", but I cannot complete the
> task.
>
> Do you know any example in github, blog, that I can use to complete the
> task.
>
> Many Thx
> BR
> Jose
>


Re: How to get Cluster metrics in FLIP-6 mode

2018-09-11 Thread Gary Yao
Hi Tony,

You are right that these metrics are missing. There is already a ticket for
that [1]. At the moment you can obtain these information from the REST API
(/overview) [2].

Since FLIP-6, the JM is no longer responsible for these metrics but for
backwards compatibility we can leave them in the JM scope for now.

Best,
Gary


[1] https://issues.apache.org/jira/browse/FLINK-10135
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/rest_api.html#available-requests

On Tue, Sep 11, 2018 at 12:19 PM, Tony Wei  wrote:

> Hi,
>
> I found that these metrics[1] disappeared in my JM's prometheus reporter
> when I used FLIP-6 to
>  deploy standalone cluster. (flink 1.5.3 release)
>
> Cluster
> ScopeMetricsDescriptionType
> *JobManager* numRegisteredTaskManagers The number of registered
> taskmanagers. Gauge
> numRunningJobs The number of running jobs. Gauge
> taskSlotsAvailable The number of available task slots. Gauge
> taskSlotsTotal The total number of task slots. GaugeI guessed maybe JM is
> no longer responsible to these metrics, but I still need these metrics on my
> dashboard. Do anyone know how to let my metric reporter get these metrics?
> Or did I miss something?
> Thank you.
>
> Best Regards,
> Tony Wei
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.6/monitoring/metrics.html#cluster
>
>


Re: JAXB Classloading errors when using PMML Library (AWS EMR Flink 1.4.2)

2018-09-11 Thread Gary Yao
Hi,

Do you also have pmml-model-moxy as a dependency in your job? Using mvn
dependency:tree, I do not see that pmml-evaluator has a compile time
dependency on jaxb-api. The jaxb-api dependency actually comes from pmml-
model-moxy. The exclusion should be therefore defined on pmml-model-moxy.

You can also try "parent-first" ClassLoader resolution order [1].

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html#inverted-class-loading-and-classloader-resolution-order


On Tue, Sep 4, 2018 at 3:24 AM, Sameer W  wrote:

> Hi,
>
> I am using PMML dependency as below to execute ML models at prediction
> time within a Flink Map operator
>
> 
>
> org.jpmml
>
> pmml-evaluator
>
> 1.4.3
>
>
> 
>
> 
>
> javax.xml.bind
>
> jaxb-api
>
> 
>
> 
>
> org.glassfish.jaxb
>
> jaxb-runtime
>
> 
>
> 
>
> guava
>
> com.google.guava
>
> 
>
> 
>
> 
> Environment is EMR, OpenJDK 1.8 and Flink 1.4.2. My programs run fine in
> my Eclipse Development environment. However when we deploy on the cluster
> we get Classloading exceptions which are primarily due to the PMML classes
> loaded via the Flink Classloader while the JAXB classes are loaded by the
> boot classloader. Also the problem seems like the version of the jaxb
> classes referenced within the PMML library is different from the ones
> loaded by the open JDK.
>
> For example I keep getting this type of error. I have also listed another
> error after this which is linked to not being able to use reflection and
> unsafe library to set private instances within the PMML class instance
> using JAXB Unmarshaller.  -
> java.lang.LinkageError: loader constraint violation: when resolving
> interface method "javax.xml.bind.Unmarshaller.unmarshal(Ljavax/xml/
> transform/Source;)Ljava/lang/Object;" the class loader (instance of
> org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
> of the current class, 
> com/comcast/mlarche/featurecreationflows/xreerrors/MyJAXBUtil,
> and the class loader (instance of ) for the method's defining
> class, javax/xml/bind/Unmarshaller, have different Class objects for the
> type javax/xml/transform/Source used in the signature
> at com.comcast.mlarche.featurecreationflows.xreerrors.MyJAXBUtil.
> unmarshal(MyJAXBUtil.java:52)
> at com.comcast.mlarche.featurecreationflows.xreerrors.MyJAXBUtil.
> unmarshalPMML(MyJAXBUtil.java:38)
> at com.comcast.mlarche.featurecreationflows.
> xreerrors.PMMLModelExecution.getMiningModelEvaluator(
> PMMLModelExecution.java:67)
> at com.comcast.mlarche.featurecreationflows.
> xreerrors.PMMLModelExecution.predict(PMMLModelExecution.java:126)
> at com.comcast.mlarche.featurecreationflows.xreerrors.
> XreErrorModelsPredictionServiceService.predict(
> XreErrorModelsPredictionServiceService.java:61)
> at com.comcast.mlarche.featurecreationflows.xreerrors.
> XreErrorModelsPredictionServiceService.predictSystemRefresh(
> XreErrorModelsPredictionServiceService.java:44)
> at com.comcast.mlarche.featurecreationflows.xreerrors.
> XREErrorsModelExecutionMap.flatMap(XREErrorsModelExecutionMap.java:46)
> at com.comcast.mlarche.featurecreationflows.xreerrors.
> XREErrorsModelExecutionMap.flatMap(XREErrorsModelExecutionMap.java:17)
> at org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:50)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:524)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:504)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:830)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:808)
> at org.apache.flink.streaming.api.operators.
> TimestampedCollector.collect(TimestampedCollector.java:51)
> at com.comcast.mlarche.featurecreationflows.xreerrors.
> XreErrorsModelsApplyFunction.apply(XreErrorsModelsApplyFunction.java:65)
> at com.comcast.mlarche.featurecreationflows.xreerrors.
> XreErrorsModelsApplyFunction.apply(XreErrorsModelsApplyFunction.java:20)
> at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableWindowFunction.process(InternalIterableWindowFunction
> .java:44)
> at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableWindowFunction.process(InternalIterableWindowFunction
> .java:32)
> at org.apache.flink.streaming.runtime.operators.windowing.
> EvictingWindowOperator.emitWindowContents(EvictingWindowOperator.java:357)
> at org.apache.flink.streaming.runtime.operators.windowing.
> 

Re: Question about akka configuration for FLIP-6

2018-09-10 Thread Gary Yao
Hi Tison,

These can be still be set but judging from the documentation [1], they have
never been very relevant in Flink:

Heartbeat interval for Akka's transport failure detector. Since Flink
uses
TCP, the detector is not necessary. Therefore, the detector is disabled
by
setting the interval to a very high value. In case you should need the
transport failure detector, set the interval to some reasonable value.
The
interval value requires a time-unit specifier (ms/s/min/h/d).

If you have a good reason to use the transport failure detector, I would be
interested in it.

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#akka-transport-heartbeat-interval

On Mon, Sep 10, 2018 at 8:20 AM, 陈梓立  wrote:

> Hi Gray,
>
> Thanks for your useful information! Here I wonder if the following configs
> still valid on FLIP-6 mode.
>
> 1. akka.transport.heartbeat.interval
> 2. akka.transport.heartbeat.pause
>
> It seems they are different from HeartbeatServices and possibly still
> valid.
>
> Best,
> tison.
>
>
> Gary Yao  于2018年9月10日周一 下午1:50写道:
>
>> I should add that in FLIP-6 mode we are not relying on Akka's DeathWatch
>> but
>> because Flink's RPC framework uses Akka, you are still able to configure
>> the
>> other Akka config options [1].
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-
>> release-1.5/ops/config.html#distributed-coordination-via-akka
>>
>> On Mon, Sep 10, 2018 at 7:38 AM, Gary Yao  wrote:
>>
>>> Hi Tony,
>>>
>>> You are right that with FLIP-6 Akka is abstracted away. If you want
>>> custom
>>> heartbeat settings, you can configure the options below [1]:
>>>
>>> heatbeat.interval
>>> heartbeat.timeout
>>>
>>> The config option taskmanager.exit-on-fatal-akka-error is also not
>>> relevant
>>> anymore. I closest I can think of is taskmanager.registration.timeout
>>> [2].
>>>
>>> Best,
>>> Gary
>>>
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-
>>> release-1.5/ops/config.html#heartbeat-manager
>>> [2] https://ci.apache.org/projects/flink/flink-docs-
>>> release-1.5/ops/config.html#taskmanager-registration-timeout
>>>
>>> On Mon, Sep 10, 2018 at 4:24 AM, Tony Wei 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm going to migrate my flink cluster from 1.4.0 to 1.5.3, and I have
>>>> been trying to map config file
>>>> to the latest version. I used to use these three configuration. Are
>>>> they still needed in FLIP-6 mode?
>>>> Moreover, is any akka config still needed in FLIP-6 mode? Since I had a
>>>> impression that FLIP-6
>>>> tried to get rid of akka and use its own rpc interface. Please correct
>>>> me if I misunderstood. Thanks.
>>>>
>>>> akka.watch.heartbeat.interval
>>>> akka.watch.heartbeat.pause
>>>> taskmanager.exit-on-fatal-akka-error
>>>>
>>>> Best Regards,
>>>> Tony Wei
>>>>
>>>
>>>
>>


Re: Question about akka configuration for FLIP-6

2018-09-09 Thread Gary Yao
I should add that in FLIP-6 mode we are not relying on Akka's DeathWatch but
because Flink's RPC framework uses Akka, you are still able to configure the
other Akka config options [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#distributed-coordination-via-akka

On Mon, Sep 10, 2018 at 7:38 AM, Gary Yao  wrote:

> Hi Tony,
>
> You are right that with FLIP-6 Akka is abstracted away. If you want custom
> heartbeat settings, you can configure the options below [1]:
>
> heatbeat.interval
> heartbeat.timeout
>
> The config option taskmanager.exit-on-fatal-akka-error is also not
> relevant
> anymore. I closest I can think of is taskmanager.registration.timeout [2].
>
> Best,
> Gary
>
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/ops/config.html#heartbeat-manager
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/ops/config.html#taskmanager-registration-timeout
>
> On Mon, Sep 10, 2018 at 4:24 AM, Tony Wei  wrote:
>
>> Hi,
>>
>> I'm going to migrate my flink cluster from 1.4.0 to 1.5.3, and I have
>> been trying to map config file
>> to the latest version. I used to use these three configuration. Are they
>> still needed in FLIP-6 mode?
>> Moreover, is any akka config still needed in FLIP-6 mode? Since I had a
>> impression that FLIP-6
>> tried to get rid of akka and use its own rpc interface. Please correct me
>> if I misunderstood. Thanks.
>>
>> akka.watch.heartbeat.interval
>> akka.watch.heartbeat.pause
>> taskmanager.exit-on-fatal-akka-error
>>
>> Best Regards,
>> Tony Wei
>>
>
>


Re: Question about akka configuration for FLIP-6

2018-09-09 Thread Gary Yao
Hi Tony,

You are right that with FLIP-6 Akka is abstracted away. If you want custom
heartbeat settings, you can configure the options below [1]:

heatbeat.interval
heartbeat.timeout

The config option taskmanager.exit-on-fatal-akka-error is also not relevant
anymore. I closest I can think of is taskmanager.registration.timeout [2].

Best,
Gary


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#heartbeat-manager
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#taskmanager-registration-timeout

On Mon, Sep 10, 2018 at 4:24 AM, Tony Wei  wrote:

> Hi,
>
> I'm going to migrate my flink cluster from 1.4.0 to 1.5.3, and I have been
> trying to map config file
> to the latest version. I used to use these three configuration. Are they
> still needed in FLIP-6 mode?
> Moreover, is any akka config still needed in FLIP-6 mode? Since I had a
> impression that FLIP-6
> tried to get rid of akka and use its own rpc interface. Please correct me
> if I misunderstood. Thanks.
>
> akka.watch.heartbeat.interval
> akka.watch.heartbeat.pause
> taskmanager.exit-on-fatal-akka-error
>
> Best Regards,
> Tony Wei
>


Re: Cancel flink job occur exception

2018-09-08 Thread Gary Yao
Hi all,

The question is being handled on the dev mailing list [1].

Best,
Gary

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Cancel-flink-job-occur-exception-td24056.html

On Tue, Sep 4, 2018 at 2:21 PM, rileyli(李瑞亮)  wrote:

> Hi all,
>   I submit a flink job through yarn-cluster mode and cancel job with
> savepoint option immediately after job status change to deployed.
> Sometimes i met this error:
>
> org.apache.flink.util.FlinkException: Could not cancel job .
> at org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(
> CliFrontend.java:585)
> at org.apache.flink.client.cli.CliFrontend.runClusterAction(
> CliFrontend.java:960)
> at org.apache.flink.client.cli.CliFrontend.cancel(
> CliFrontend.java:577)
> at org.apache.flink.client.cli.CliFrontend.parseParameters(
> CliFrontend.java:1034)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
> complete the operation. Number of retries has been exhausted.
> at java.util.concurrent.CompletableFuture.reportGet(
> CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(
> CompletableFuture.java:1895)
> at org.apache.flink.client.program.rest.RestClusterClient.
> cancelWithSavepoint(RestClusterClient.java:398)
> at org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(
> CliFrontend.java:583)
> ... 6 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been exhausted.
> at org.apache.flink.runtime.concurrent.FutureUtils.lambda$
> retryOperationWithDelay$5(FutureUtils.java:213)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(
> CompletableFuture.java:760)
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(
> CompletableFuture.java:736)
> ... 1 more
> Caused by: java.util.concurrent.CompletionException:
> java.net.ConnectException: Connect refuse: xxx/xxx.xxx.xxx.xxx:xxx
> at java.util.concurrent.CompletableFuture.encodeThrowable(
> CompletableFuture.java:292)
> at java.util.concurrent.CompletableFuture.completeThrowable(
> CompletableFuture.java:308)
> at java.util.concurrent.CompletableFuture.uniCompose(
> CompletableFuture.java:943)
> at java.util.concurrent.CompletableFuture$UniCompose.
> tryFire(CompletableFuture.java:926)
> ... 16 more
> Caused by: java.net.ConnectException: Connect refuse:
> xxx/xxx.xxx.xxx.xxx:xxx
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(
> SocketChannelImpl.java:717)
> at org.apache.flink.shaded.netty4.io.netty.channel.
> socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
> at org.apache.flink.shaded.netty4.io.netty.channel.nio.
> AbstractNioChannel$AbstractNioUnsafe.finishConnect(
> AbstractNioChannel.java:281)
> ... 7 more
>
> I check the jobmanager log, no error found. Savepoint is correct saved
> in hdfs. Yarn appliction status changed to FINISHED and FinalStatus change
> to KILLED.
> I think this issue occur because RestClusterClient cannot find
> jobmanager addresss after Jobmanager(AM) has shutdown.
> My flink version is 1.5.3.
> Anyone could help me to resolve this issue, thanks!
>
> Best Regard!
>


Re: Setting Flink Monitoring API Port on YARN Cluster

2018-09-07 Thread Gary Yao
Hi Austin,

The config options rest.port, jobmanager.web.port, etc. are intentionally
ignored on YARN. The port should be chosen randomly to avoid conflicts with
other containers [1]. I do not see a way how you can set a fixed port at the
moment but there is a related ticket for that [2]. The Flink CLI determines
the hostname and port from the YARN ApplicationReport [3][4] – you can do
the
same.

Best,
Gary

[1]
https://github.com/apache/flink/blob/d036417985d3e2b1ca63909007db9710e842abf4/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java#L103

[2] https://issues.apache.org/jira/browse/FLINK-5758

[3]
https://github.com/apache/flink/blob/d036417985d3e2b1ca63909007db9710e842abf4/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L387

[4]
https://hadoop.apache.org/docs/r2.8.3/api/org/apache/hadoop/yarn/api/records/ApplicationReport.html#getRpcPort()

On Fri, Sep 7, 2018 at 12:33 AM, Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi everyone,
>
> I'm running a YARN session on a cluster with one master and one core and
> would like to use the Monitoring API programmatically to submit jobs. I
> have found that the configuration variables are read but ignored when
> starting the session - it seems to choose a random port each run.
>
> Here's a snippet from the startup logs:
>
> 2018-09-06 21:44:38,763 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: env.yarn.conf.dir,
> /etc/hadoop/conf
> 2018-09-06 21:44:38,764 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: env.hadoop.conf.dir,
> /etc/hadoop/conf
> 2018-09-06 21:44:38,765 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: rest.port, 44477
> 2018-09-06 21:44:38,765 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: jobmanager.web.port, 44477
> 2018-09-06 21:44:38,765 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   - Loading configuration property: high-availability.jobmanager.port,
> 44477
> 2018-09-06 21:44:38,775 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli
>- Found Yarn properties file under
> /tmp/.yarn-properties-hadoop.
> 2018-09-06 21:44:39,615 WARN  org.apache.hadoop.util.NativeCodeLoader
>- Unable to load native-hadoop library for your
> platform... using builtin-java classes where applicable
> 2018-09-06 21:44:39,799 INFO  
> org.apache.flink.runtime.security.modules.HadoopModule
>   - Hadoop user set to hadoop (auth:SIMPLE)
> 2018-09-06 21:44:40,045 INFO  org.apache.hadoop.yarn.client.RMProxy
>- Connecting to ResourceManager at
> ip-10-2-3-71.ec2.internal/10.2.3.71:8032
> 2018-09-06 21:44:40,312 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - Cluster specification: ClusterSpecification{masterMemoryMB=1024,
> taskManagerMemoryMB=4096, numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-09-06 21:44:43,564 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - Submitting application master application_1536250520330_0007
> 2018-09-06 21:44:43,802 INFO  
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
>- Submitted application application_1536250520330_0007
> 2018-09-06 21:44:43,802 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - Waiting for the cluster to be allocated
> 2018-09-06 21:44:43,804 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - Deploying cluster, current state ACCEPTED
> 2018-09-06 21:44:48,326 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - YARN application has been deployed successfully.
> 2018-09-06 21:44:48,326 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor
>  - The Flink YARN client has been started in detached mode. In
> order to stop Flink on YARN, use the following command or a YARN web
> interface to stop it:
> yarn application -kill application_1536250520330_0007
> Please also note that the temporary files of the YARN session in the home
> directory will not be removed.
> 2018-09-06 21:44:48,821 INFO  org.apache.flink.runtime.rest.RestClient
>   - Rest client endpoint started.
> Flink JobManager is now running on ip-10-2-3-25.ec2.internal:38683 with
> leader id ----.
> JobManager Web Interface: http://ip-10-2-3-25.ec2.internal:38683
>
>
> I'm setting both the rest.port and jobmanager.web.port, but both are
> ignored. Has anyone seen this before?
>
> Thanks!
>


Re: flink list and flink run commands timeout

2018-09-05 Thread Gary Yao
Hi Jason,

>From the stacktrace it seems that you are using the 1.4.0 client to list
jobs
on a 1.5.x cluster. This will not work. You have to use the 1.5.x client.

Best,
Gary

On Wed, Sep 5, 2018 at 5:35 PM, Jason Kania  wrote:

> Hello,
>
> Thanks for the response. I had already tried setting the log level to
> debug in log4j-cli.properties, logback-console.xml, and 
> log4j-console.properties
> but no additional relevant information comes out. On the server, all that
> comes out are zookeeper ping responses:
>
> 2018-09-05 15:16:56,786 DEBUG org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping response for
> sessionid: 0x3659b60bcb50076 after 1ms
>
> The client log indicates only the following (but we are not using hadoop):
>
> 2018-09-05 15:19:53,339 WARN  org.apache.flink.client.cli.CliFrontend
>- Could not load CLI class org.apache.flink.yarn.cli.
> FlinkYarnSessionCli.
> java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(
> CliFrontend.java:1208)
> at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(
> CliFrontend.java:1164)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.
> java:1090)
> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.
> Configuration
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 5 more
>
>
> and
>
> 2018-09-05 15:19:53,881 ERROR org.apache.flink.shaded.
> curator.org.apache.curator.ConnectionState  - Authentication failed
>
>
> despite the zookeeper being configured as 'open' and latest logs showing
> data being read from zookeeper.
>
> 2018-09-05 15:19:54,274 DEBUG org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ClientCnxn  - Reading reply
> sessionid:0x265a12437df0074, packet:: clientPath:null serverPath:null
> finished:false header:: 1,3  replyHeader:: 1,47244656277,0  request::
> '/flink,F  response:: s{47244656196,47244656196,
> 1536110417531,1536110417531,0,1,0,0,0,1,47244656197}
>
>
> Much like the basic log output, the detailed trace shows no additional
> information, just a gap after waiting for the response:
>
> 2018-09-05 15:19:54,313 INFO  org.apache.flink.client.cli.CliFrontend
>- Waiting for response...
> 2018-09-05 15:20:07,635 DEBUG org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping response for
> sessionid: 0x265a12437df0074 after 1ms
> 2018-09-05 15:20:20,976 DEBUG org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping response for
> sessionid: 0x265a12437df0074 after 1ms
> 2018-09-05 15:20:24,311 INFO  org.apache.flink.runtime.rest.RestClient
>   - Shutting down rest endpoint.
> 2018-09-05 15:20:24,317 INFO  org.apache.flink.runtime.rest.RestClient
>   - Rest endpoint shutdown complete.
> 2018-09-05 15:20:24,318 INFO  org.apache.flink.runtime.leaderretrieval.
> ZooKeeperLeaderRetrievalService  - Stopping ZooKeeperLeaderRetrievalService
> /leader/rest_server_lock.
> 2018-09-05 15:20:24,320 INFO  org.apache.flink.runtime.leaderretrieval.
> ZooKeeperLeaderRetrievalService  - Stopping ZooKeeperLeaderRetrievalService
> /leader/dispatcher_lock.
> 2018-09-05 15:20:24,320 DEBUG org.apache.flink.shaded.
> curator.org.apache.curator.framework.imps.CuratorFrameworkImpl  - Closing
> 2018-09-05 15:20:24,321 INFO  org.apache.flink.shaded.
> curator.org.apache.curator.framework.imps.CuratorFrameworkImpl  -
> backgroundOperationsLoop exiting
> 2018-09-05 15:20:24,322 DEBUG org.apache.flink.shaded.
> curator.org.apache.curator.CuratorZookeeperClient  - Closing
> 2018-09-05 15:20:24,322 DEBUG org.apache.flink.shaded.
> curator.org.apache.curator.ConnectionState  - Closing
> 2018-09-05 15:20:24,323 DEBUG org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ZooKeeper  - Closing session:
> 0x265a12437df0074
> 2018-09-05 15:20:24,323 DEBUG org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ClientCnxn  - Closing client for session:
> 0x265a12437df0074
> 2018-09-05 15:20:24,329 DEBUG org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ClientCnxn  - Reading reply
> sessionid:0x265a12437df0074, packet:: clientPath:null serverPath:null
> finished:false header:: 11,-11  replyHeader:: 11,47244656278,0  request::
> null response:: null
> 2018-09-05 15:20:24,329 DEBUG org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ClientCnxn  - Disconnecting client for
> session: 0x265a12437df0074
> 2018-09-05 15:20:24,330 INFO  org.apache.flink.shaded.
> zookeeper.org.apache.zookeeper.ZooKeeper  - Session: 

Re: AskTimeoutException when canceling job with savepoint on flink 1.6.0

2018-09-05 Thread Gary Yao
Hi Jelmer,

I saw that you have already found the JIRA issue tracking this problem [1]
but
I will still answer on the mailing list for transparency.

The timeout for "cancel with savepoint" should be RpcUtils.INF_TIMEOUT.
Unfortunately Flink is currently not respecting this timeout. A pull request
is already available, and is expected to be merged within the next days [2].

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-10193
[2] https://github.com/apache/flink/pull/6601

On Thu, Sep 6, 2018 at 4:24 AM, vino yang  wrote:

> Hi Jelmer,
>
> Here's a similar question, and you can refer to the discussion options.[1]
>
> [1]: http://mail-archives.apache.org/mod_mbox/flink-user/
> 201808.mbox/%3CCAMJEyBa9zJX_huqTLxDCu87hpHRVRzXZoYJpQxzXDk
> q2h_k...@mail.gmail.com%3E
>
> Hi Till and Chesnay,
>
> Recently, several users have encountered this problem in the past month.
> Maybe the community should give priority to the stability of this part or
> list the guidelines in the official document FAQ?
>
> Thanks, vino.
>
> jelmer  于2018年9月5日周三 下午8:48写道:
>
>> I am trying to upgrade a job from flink 1.4.2 to 1.6.0
>>
>> When we do a deploy we cancel the job with a savepoint then deploy the
>> new version of the job from that savepoint. Because our jobs tend to have a
>> lot of state it often takes multiple minutes for our savepoints to
>> complete.
>>
>> On flink 1.4.2 we set *akka.client.timeout* to a high value to make sure
>> the request did not timeout
>>
>> However on flink 1.6.0 I get an *AskTimeoutException*  and increasing
>> *akka.client.timeout* only works if i apply it to the running flink
>> process.
>> Applying it to just the flink client does nothing.
>>
>> I am reluctant to configure this on the container itself because afaik it
>> applies to everything inside of flink's internal actor system not just to
>> creating savepoints.
>>
>> What is the correct way to use cancel with savepoint for jobs with lots
>> of state in flink 1.6.0 ?
>>
>> I Attached the error.
>>
>>
>>
>>


Re: Flink on Yarn, restart job will not destroy original task manager

2018-09-03 Thread Gary Yao
Hi James,

Local recovery is disabled by default. You do not need to configure anything
in addition.

Did you run into problems again or does it work now? If you are stil
experiencing task spread out, can you configure logging on DEBUG level, and
share the jobmanager logs with us?

Best,
Gary

On Tue, Sep 4, 2018 at 5:42 AM, James (Jian Wu) [FDS Data Platform] <
james...@coupang.com> wrote:

> Hi Gary:
>
>
>
> From 1.5/1.6 document:
>
>
>
> Configuring task-local recovery
>
> Task-local recovery is *deactivated by default* and can be activated
> through Flink’s configuration with the key state.backend.local-recovery as
> specified in CheckpointingOptions.LOCAL_RECOVERY. The value for this
> setting can either be *true* to enable or *false*(default) to disable
> local recovery.
>
>
>
> By default, local recovery is deactive. In 1.5.0, I’ve not enable local
> recovery.
>
>
>
> So whether I need manual disable local recovery via flink.conf?
>
>
>
> Regards
>
>
>
> James
>
>
>
> *From: *"James (Jian Wu) [FDS Data Platform]" 
> *Date: *Monday, September 3, 2018 at 4:13 PM
> *To: *Gary Yao 
>
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Flink on Yarn, restart job will not destroy original task
> manager
>
>
>
> My Flink version is 1.5, I will rebuild new version flink
>
>
>
> Regards
>
>
>
> James
>
>
>
> *From: *Gary Yao 
> *Date: *Monday, September 3, 2018 at 3:57 PM
> *To: *"James (Jian Wu) [FDS Data Platform]" 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Flink on Yarn, restart job will not destroy original task
> manager
>
>
>
> Hi James,
>
> What version of Flink are you running? In 1.5.0, tasks can spread out due
> to
> changes that were introduced to support "local recovery". There is a
> mitigation in 1.5.1 that prevents task spread out but local recovery must
> be
> disabled [2].
>
> Best,
> Gary
>
> [1] https://issues.apache.org/jira/browse/FLINK-9635
> [2] https://issues.apache.org/jira/browse/FLINK-9634
>
>
>
> On Mon, Sep 3, 2018 at 9:20 AM, James (Jian Wu) [FDS Data Platform] <
> james...@coupang.com> wrote:
>
> Hi:
>
>
>
>   I launch flink application on yarn with 5 task manager, every task
> manager has 5 slots with such script
>
>
>
> #!/bin/sh
>
> CLASSNAME=$1
>
> JARNAME=$2
>
> ARUGMENTS=$3
>
>
>
> export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws"
>
> /usr/bin/flink run -m yarn-cluster --parallelism 15  -yn 5 -ys 3 -yjm 8192
> -ytm 8192  -ynm flink-order-detection -yD 
> env.java.opts.jobmanager='-Dmill.env.active=aws'
> -yD env.java.opts.taskmanager='-Dmill.env.active=aws'  -c $CLASSNAME   \
>
> $JARNAME $ARUGMENTS
>
>
>
>
>
> The original flink app occupy 5 containers and 15 vcores, run for 3+ days,
> one of task manage killed by yarn because of memory leak and job manager
> start new task managers. Currently my flink app running normally on yarn,
>  but occupy 10 containers, 28 vcores. (Application Master shows my flink
> job running for 75 hours, click into running job in flink web ui, it shows
> my job running for 28hours because of restart)
>
>
>
> In my opinion, job manager will attempt to start the failed task manager,
> and in the final app still use 5 containers and 15 vcores, why after
> restart job by yarn will occupy double resource.
>
>
>
> Any one can give me some suggestion?
>
>
>
> Regards
>
>
>
> James
>
>
>


Re: Flink on Yarn, restart job will not destroy original task manager

2018-09-03 Thread Gary Yao
Hi James,

What version of Flink are you running? In 1.5.0, tasks can spread out due to
changes that were introduced to support "local recovery". There is a
mitigation in 1.5.1 that prevents task spread out but local recovery must be
disabled [2].

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-9635
[2] https://issues.apache.org/jira/browse/FLINK-9634

On Mon, Sep 3, 2018 at 9:20 AM, James (Jian Wu) [FDS Data Platform] <
james...@coupang.com> wrote:

> Hi:
>
>
>
>   I launch flink application on yarn with 5 task manager, every task
> manager has 5 slots with such script
>
>
>
> #!/bin/sh
>
> CLASSNAME=$1
>
> JARNAME=$2
>
> ARUGMENTS=$3
>
>
>
> export JVM_ARGS="${JVM_ARGS} -Dmill.env.active=aws"
>
> /usr/bin/flink run -m yarn-cluster --parallelism 15  -yn 5 -ys 3 -yjm 8192
> -ytm 8192  -ynm flink-order-detection -yD 
> env.java.opts.jobmanager='-Dmill.env.active=aws'
> -yD env.java.opts.taskmanager='-Dmill.env.active=aws'  -c $CLASSNAME   \
>
> $JARNAME $ARUGMENTS
>
>
>
>
>
> The original flink app occupy 5 containers and 15 vcores, run for 3+ days,
> one of task manage killed by yarn because of memory leak and job manager
> start new task managers. Currently my flink app running normally on yarn,
>  but occupy 10 containers, 28 vcores. (Application Master shows my flink
> job running for 75 hours, click into running job in flink web ui, it shows
> my job running for 28hours because of restart)
>
>
>
> In my opinion, job manager will attempt to start the failed task manager,
> and in the final app still use 5 containers and 15 vcores, why after
> restart job by yarn will occupy double resource.
>
>
>
> Any one can give me some suggestion?
>
>
>
> Regards
>
>
>
> James
>


Re: akka.ask.timeout setting not honored

2018-08-31 Thread Gary Yao
Hi Greg,

Unfortunately the environment information [1] is not logged. Can you set the
log level for all Flink packages to DEBUG?

Do you install Flink yourself on EMR, or do you use the pre-installed one?
Can you show us the command with which you start the cluster/submit the job?

I do not know if it is related but I found these warnings in your second
log file:

2018-08-31 19:14:32 WARN  org.apache.flink.configuration.Configuration
- Configuration cannot evaluate value 300s as a long integer number
2018-08-31 19:14:32 WARN  org.apache.flink.configuration.Configuration
- Configuration cannot evaluate value 300s as a long integer number

Best,
Gary

[1]
https://github.com/apache/flink/blob/9ae5009b6a82248bfae99dac088c1f6e285aa70f/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java#L281

On Fri, Aug 31, 2018 at 9:18 PM, Greg Finch  wrote:

> Well ... that didn't take long.  The next time I tried, I got the Akka
> timeout again.  Attached are the logs from the last attempt.  They're very
> similar to the other logs I sent.
>
> On Fri, Aug 31, 2018 at 2:04 PM Greg Finch  wrote:
>
>> Thanks Gary.  Attached is the jobmanager log.  You are correct that this
>> is running on YARN.  I changed web.timeout as you suggested - that seems to
>> be working the few times I tested it.  This problem comes and goes though -
>> sometimes it starts before it times out.  I'll keep the web.timeout setting
>> and reply again if the problem comes up again.  Thanks again for your quick
>> response!
>>
>> On Fri, Aug 31, 2018 at 1:38 PM Gary Yao  wrote:
>>
>>> Hi Greg,
>>>
>>> Can you describe the steps to reproduce the problem, or can you attach
>>> the
>>> full jobmanager logs? Because JobExecutionResultHandler appears in your
>>> log, I
>>> assume that you are starting a job cluster on YARN. Without seeing the
>>> complete logs, I cannot be sure what exactly happens. For now, you can
>>> try
>>> setting the config option web.timeout to a higher value.
>>>
>>> Best,
>>> Gary
>>>
>>> On Fri, Aug 31, 2018 at 8:01 PM, Greg Finch 
>>> wrote:
>>>
>>>> I'm having a problem with akka timeout when starting my cluster.  The
>>>> error is "Ask timed out after 1 ms.".  I have changed the
>>>> akka.ask.timeout config setting to be 30 ms, but it still times out and
>>>> fails after 10 seconds.  I confirmed that the config is properly set by
>>>> both checking the Job Manager configuration tab (it shows 30 ms) as
>>>> well logging the output of AkkaUtils.getTimeout(configuration) which
>>>> also shows 30ms.  It seems something is not honoring that configuration
>>>> value.
>>>>
>>>> I did find a different thread that discussed the fact that the
>>>> LocalStreamEnvironment will not honor this setting, but that is not my
>>>> case.  I am running on a cluster (AWS EMR) using the regular
>>>> StreamExecutionEnvironment.  This is Flink 1.5.2.
>>>>
>>>> Any ideas?
>>>>
>>>> ~
>>>>
>>>> 2018-08-31 17:37:55 INFO  
>>>> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl  - Received new 
>>>> token for : ip-10-213-139-66.ec2.internal:8041
>>>> 2018-08-31 17:37:55 INFO  
>>>> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl  - Received new 
>>>> token for : ip-10-213-136-25.ec2.internal:8041
>>>> 2018-08-31 17:38:34 ERROR 
>>>> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
>>>> Implementation error: Unhandled exception.
>>>> akka.pattern.AskTimeoutException: Ask timed out on 
>>>> [Actor[akka://flink/user/dispatcher#-219618710]] after [1 ms]. 
>>>> Sender[null] sent message of type 
>>>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>>>at 
>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>>>>at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>>>>at 
>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>>>at 
>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>>at 
>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>>>at 
>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevol

Re: akka.ask.timeout setting not honored

2018-08-31 Thread Gary Yao
Hi Greg,

Can you describe the steps to reproduce the problem, or can you attach the
full jobmanager logs? Because JobExecutionResultHandler appears in your
log, I
assume that you are starting a job cluster on YARN. Without seeing the
complete logs, I cannot be sure what exactly happens. For now, you can try
setting the config option web.timeout to a higher value.

Best,
Gary

On Fri, Aug 31, 2018 at 8:01 PM, Greg Finch  wrote:

> I'm having a problem with akka timeout when starting my cluster.  The
> error is "Ask timed out after 1 ms.".  I have changed the
> akka.ask.timeout config setting to be 30 ms, but it still times out and
> fails after 10 seconds.  I confirmed that the config is properly set by
> both checking the Job Manager configuration tab (it shows 30 ms) as
> well logging the output of AkkaUtils.getTimeout(configuration) which also
> shows 30ms.  It seems something is not honoring that configuration
> value.
>
> I did find a different thread that discussed the fact that the
> LocalStreamEnvironment will not honor this setting, but that is not my
> case.  I am running on a cluster (AWS EMR) using the regular
> StreamExecutionEnvironment.  This is Flink 1.5.2.
>
> Any ideas?
>
> ~
>
> 2018-08-31 17:37:55 INFO  
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl  - Received new token 
> for : ip-10-213-139-66.ec2.internal:8041
> 2018-08-31 17:37:55 INFO  
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl  - Received new token 
> for : ip-10-213-136-25.ec2.internal:8041
> 2018-08-31 17:38:34 ERROR 
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
> Implementation error: Unhandled exception.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/dispatcher#-219618710]] after [1 ms]. 
> Sender[null] sent message of type 
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>   at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>   at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>   at java.lang.Thread.run(Thread.java:748)
> 2018-08-31 17:38:41 INFO  
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl  - Waiting for 
> application to be successfully unregistered.
> 2018-08-31 17:38:41 INFO  
> o.a.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Interrupted 
> while waiting for queue
> java.lang.InterruptedException: null
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
>   at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>   at 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:323)
> 2018-08-31 17:38:42 WARN  akka.remote.ReliableDeliverySupervisor 
> flink-akka.remote.default-remote-dispatcher-81 - Association with remote 
> system [akka.tcp://flink@ip-10-213-142-102.ec2.internal:42027] has failed, 
> address is now gated for [50] ms. Reason: [Disassociated]
>
>
>


Re: getting error while running flink job on yarn cluster

2018-08-21 Thread Gary Yao
Hi Yubraj Singh,

Can you try submitting with HADOOP_CLASSPATH=`hadoop classpath` set? [1]
For example:
  HADOOP_CLASSPATH=`hadoop classpath` bin/flink run [...]

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/hadoop.html#configuring-flink-with-hadoop-classpaths

On Tue, Aug 21, 2018 at 4:23 PM, yuvraj singh <19yuvrajsing...@gmail.com>
wrote:

> Hi ,
>
> i am getting a error while running a flink job on yarn cluster , its
> running fine when i run it on flink standalone cluster
>
> java.lang.NoClassDefFoundError: com/sun/jersey/core/util/
> FeaturesAndProperties
>
> at java.lang.ClassLoader.defineClass1(Native Method)
>
> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>
> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> at org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(
> TimelineClient.java:55)
>
> at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.
> createTimelineClient(YarnClientImpl.java:181)
>
> at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.
> serviceInit(YarnClientImpl.java:168)
>
> at org.apache.hadoop.service.AbstractService.init(
> AbstractService.java:163)
>
> at org.apache.flink.yarn.cli.FlinkYarnSessionCli.getClusterDescriptor(
> FlinkYarnSessionCli.java:966)
>
> at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createDescriptor(
> FlinkYarnSessionCli.java:269)
>
> at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(
> FlinkYarnSessionCli.java:444)
>
> at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(
> FlinkYarnSessionCli.java:92)
>
> at org.apache.flink.client.cli.CliFrontend.runProgram(
> CliFrontend.java:225)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
>
> at org.apache.flink.client.cli.CliFrontend.parseParameters(
> CliFrontend.java:1025)
>
> at org.apache.flink.client.cli.CliFrontend.lambda$main$9(
> CliFrontend.java:1101)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1754)
>
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(
> HadoopSecurityContext.java:41)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
>
> Caused by: java.lang.ClassNotFoundException: com.sun.jersey.core.util.
> FeaturesAndProperties
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> ... 29 more
>
>
> please help me
>
>
> thanks
>
> Yubraj singh
>


  1   2   >