Re: Testing RichAsyncFunction with TestHarness

2020-03-30 Thread Gary Yao
>
> Additionally even though I add all necessary dependencies defiend in [1] I
> cannot see ProcessFunctionTestHarnesses class.
>

That class was added in Flink 1.10 [1].

[1]
https://github.com/apache/flink/blame/f765ad09ae2b2aa478c887b988e11e92a8b730bd/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ProcessFunctionTestHarnesses.java

On Fri, Mar 27, 2020 at 10:13 PM KristoffSC 
wrote:

> Hi,
> Im trying to test my RichAsyncFunction implementation with
> OneInputStreamOperatorTestHarness based on [1]. I'm using Flink 1.9.2
>
> My test setup is:
>  this.processFunction = new MyRichAsyncFunction();
> this.testHarness = new OneInputStreamOperatorTestHarness<>(
> new AsyncWaitOperator<>(processFunction, 2000, 1,
> OutputMode.ORDERED));
>
> this.testHarness.open();
>
> I'm having below exception when calling  this.testHarness.open();
>
> java.lang.NullPointerException
> at java.base/java.util.Objects.requireNonNull(Objects.java:221)
> at
>
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.(StreamElementSerializer.java:64)
> at
>
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:142)
> at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:287)
> at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:275)
> at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:393)
> at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:300)
> at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:308)
> at
>
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:483)
>
>
> I will appreciate help with this one.
>
> Additionally even though I add all necessary dependencies defiend in [1] I
> cannot see ProcessFunctionTestHarnesses class.
>
> Thanks.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Fwd: Lack of KeyedBroadcastStateBootstrapFunction

2020-03-30 Thread Tzu-Li (Gordon) Tai
It seems like Seth's reply didn't make it to the mailing lists somehow.
Forwarding his reply below:

-- Forwarded message -
From: Seth Wiesman 
Date: Thu, Mar 26, 2020 at 5:16 AM
Subject: Re: Lack of KeyedBroadcastStateBootstrapFunction
To: Dawid Wysakowicz 
Cc: , Tzu-Li (Gordon) Tai 


As Dawid mentioned, you can implement your own operator using the transform
method to do this yourself. Unfortunately, that is fairly low level and
would require you to understand some flink amount internals.

The real problem is that the state processor api does not support two input
operators. We originally skipped that because there were a number of open
questions about how best to do it and it wasn't clear that it would be a
necessary feature. Typically, flink users use two input operators to do
some sort of join. And when bootstrapping state, you typically only want to
pre-fill one side of that join. KeyedBroadcastState is clearly a good
counter-argument to that.

I've opened a ticket for the feature if you would like to comment there.

https://issues.apache.org/jira/browse/FLINK-16784

On Tue, Mar 24, 2020 at 9:17 AM Dawid Wysakowicz 
wrote:

> Hi,
>
> I am not very familiar with the State Processor API, but from a brief look
> at it, I think you are right. I think the State Processor API does not
> support mixing different kinds of states in a single operator for now. At
> least not in a nice way. Probably you could implement the
> KeyedBroadcastStateBootstrapFunction yourself and us it with
> KeyedOperatorTransformation#transform(org.apache.flink.state.api.SavepointWriterOperatorFactory).
> I understand this is probably not the easiest task.
>
> I am not aware if there are plans to support that out of the box, but I
> cc'ed Gordon and Seth who if I remember correctly worked on that API. I
> hope they might give you some more insights.
>
> Best,
>
> Dawid
>  On 23/03/2020 17:36, Mark Niehe wrote:
>
> Hey all,
>
> I have another question about the State Processor API. I can't seem to
> find a way to create a KeyedBroadcastStateBootstrapFunction operator. The
> two options currently available to bootstrap a savepoint with state are
> KeyedStateBootstrapFunction and BroadcastStateBootstrapFunction. Because
> these are the only two options, it's not possible to bootstrap both keyed
> and broadcast state for the same operator. Are there any plans to add that
> functionality or did I miss it entirely when going through the API docs?
>
> Thanks,
> --
> 
> Mark Niehe ·  Software Engineer
> Integrations
>   ·
> Blog   ·  
> We're
> Hiring! 
>
>


Re: Windows on SinkFunctions

2020-03-30 Thread Robert Metzger
Hey,

In your original email, you wrote:

 Because if I have multiple sinks that that only for one of them I need a
> Window, the second solution might be problematic.


You can also send the data of an operator to multiple sinks

Source --> MyComputationProcessFunction --> DataBatcher --> BatchedSink
 \
  \--> StreamingSink
(also here if it renders weirdly in email:
https://gist.github.com/rmetzger/9ef311c2926a82fdba2b2b7af9ad65d6 )

In this example "MyComputationProcessFunction" is sending the data to two
downstream operators: DataBatcher and StreamingSink.

For the "DataBatcher", you can also consider building something yourself
with ProcessFunction. I would just collect the data in a List, and emit it
once the list reached 500 elements, or a custom 5 minute trigger has
triggered.


On Sun, Mar 29, 2020 at 12:07 PM Sidney Feiner 
wrote:

> Thanks!
> What am I supposed to put in the apply/process function for the sink to be
> invoked on a List of items?
>
> *Sidney Feiner* */* Data Platform Developer
> M: +972.528197720 */* Skype: sidney.feiner.startapp
>
> [image: emailsignature]
>
> --
> *From:* tison 
> *Sent:* Sunday, March 22, 2020 4:19 PM
> *To:* Sidney Feiner 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Windows on SinkFunctions
>
> Hi Sidney,
>
> For the case, you can exactly write
>
> stream.
>   ...
>   .window()
>   .apply()
>   .addSink()
>
> Operator chain will chain these operators into one so that you don't have
> to worry about the efficiency.
>
> Best,
> tison.
>
>
> Sidney Feiner  于2020年3月22日周日 下午10:03写道:
>
> Hey,
> I wanted to know if it's possible to define a SinkFunction as a
> WindowFunction as well.
> For example, I would like the sink to be invoked every 5 minute or once
> 500 events reached the sink.
> Is there a way to do this inside the sink implementation? Or do I have to
> create the windows prior in the pipeline?
> Because if I have multiple sinks that that only for one of them I need a
> Window, the second solution might be problematic.
>
> Thanks :)
>
> *Sidney Feiner* */* Data Platform Developer
> M: +972.528197720 */* Skype: sidney.feiner.startapp
>
> [image: emailsignature]
>
>


Re: Issue with single job yarn flink cluster HA

2020-03-30 Thread Yang Wang
Hi Dinesh,

First, i think the error message your provided is not a problem. It
just indicates that the leader
election is still ongoing. When it finished, the new leader will start the
a new dispatcher to provide
the webui and rest service.

>From your jobmanager logs "Connection refused: host1/ipaddress1:28681", we
could know that
the old jobmanager has failed. When a new jobmanager started, since the old
jobmanager still
hold the lock of leader latch. So Flink tries to connect with it. After it
tries few times, since the old
jobmanager zookeeper client do not update the leader latch, then the new
jobmanager will elect
successfully and be the active leader. It is just how the leader election
works.

In a nutshell, the root cause is old jobmanager crashed and it does not
lose the leader immediately.
It is the by-design behavior.

If you really want to make the recovery faster, i think you could decrease
"high-availability.zookeeper.client.connection-timeout"
and "high-availability.zookeeper.client.session-timeout". Please keep in
mind that too small value
will also cause unexpected failover because of network problem.


Best,
Yang

Dinesh J  于2020年3月25日周三 下午4:20写道:

> Hi Andrey,
> Yes . The job is not restarting sometimes after the current leader failure.
> Below is the message displayed when trying to reach the application master
> url via yarn ui and message remains the same even if the yarn job is
> running for 2 days.
> During this time , even current yarn application attempt is not getting
> failed and no containers are launched for jobmanager and taskmanager.
>
> *{"errors":["Service temporarily unavailable due to an ongoing leader
> election. Please refresh."]}*
>
> Thanks,
> Dinesh
>
> On Tue, Mar 24, 2020 at 6:45 PM Andrey Zagrebin 
> wrote:
>
>> Hi Dinesh,
>>
>> If the current leader crashes (e.g. due to network failures) then getting
>> these messages do not look like a problem during the leader re-election.
>> They look to me just as warnings that caused failover.
>>
>> Do you observe any problem with your application? Does the failover not
>> work, e.g. no leader is elected or a job is not restarted after the current
>> leader failure?
>>
>> Best,
>> Andrey
>>
>> On Sun, Mar 22, 2020 at 11:14 AM Dinesh J  wrote:
>>
>>> Attaching the job manager log for reference.
>>>
>>> 2020-03-22 11:39:02,693 WARN
>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever  -
>>> Error while retrieving the leader gateway. Retrying to connect to
>>> akka.tcp://flink@host1:28681/user/dispatcher.
>>> 2020-03-22 11:39:02,724 WARN  akka.remote.transport.netty.NettyTransport
>>>- Remote connection to [null] failed with
>>> java.net.ConnectException: Connection refused: host1/ipaddress1:28681
>>> 2020-03-22 11:39:02,724 WARN  akka.remote.ReliableDeliverySupervisor
>>>- Association with remote system 
>>> [akka.tcp://flink@host1:28681]
>>> has failed, address is now gated for [50] ms. Reason: [Association failed
>>> with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
>>> host1/ipaddress1:28681]
>>> 2020-03-22 11:39:02,791 WARN  akka.remote.transport.netty.NettyTransport
>>>- Remote connection to [null] failed with
>>> java.net.ConnectException: Connection refused: host1/ipaddress1:28681
>>> 2020-03-22 11:39:02,792 WARN  akka.remote.ReliableDeliverySupervisor
>>>- Association with remote system 
>>> [akka.tcp://flink@host1:28681]
>>> has failed, address is now gated for [50] ms. Reason: [Association failed
>>> with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
>>> host1/ipaddress1:28681]
>>> 2020-03-22 11:39:02,861 WARN  akka.remote.transport.netty.NettyTransport
>>>- Remote connection to [null] failed with
>>> java.net.ConnectException: Connection refused: host1/ipaddress1:28681
>>> 2020-03-22 11:39:02,861 WARN  akka.remote.ReliableDeliverySupervisor
>>>- Association with remote system 
>>> [akka.tcp://flink@host1:28681]
>>> has failed, address is now gated for [50] ms. Reason: [Association failed
>>> with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
>>> host1/ipaddress1:28681]
>>> 2020-03-22 11:39:02,931 WARN  akka.remote.transport.netty.NettyTransport
>>>- Remote connection to [null] failed with
>>> java.net.ConnectException: Connection refused: host1/ipaddress1:28681
>>> 2020-03-22 11:39:02,931 WARN  akka.remote.ReliableDeliverySupervisor
>>>- Association with remote system 
>>> [akka.tcp://flink@host1:28681]
>>> has failed, address is now gated for [50] ms. Reason: [Association failed
>>> with [akka.tcp://flink@host1:28681]] Caused by: [Connection refused:
>>> host1/ipaddress1:28681]
>>> 2020-03-22 11:39:03,001 WARN  akka.remote.transport.netty.NettyTransport
>>>- Remote connection to [null] failed with
>>> java.net.ConnectException: Connection refused: host1/ipaddr

Re: Log file environment variable 'log.file' is not set.

2020-03-30 Thread Robert Metzger
Hey,
which Flink version are you using?

Where exactly are you seeing the "Log file environment variable 'log.file'
is not set." message? Can you post some context around it? (is this shown
from the command line? what are the arguments? is it shown in a file?

Usually, the "log.file" property is used to pass the name of the log file
into the log4j configuration. If this property is not set, I have to assume
that you are using modified or custom scripts, or you are executing Flink
in an environment that fails to set the property.

When running Flink on YARN, the JobManager logs are stored on the machine
running the JobManager. The logs accessible through "yarn logs" are the
same as you would see in the JM interface.

Best,
Robert



On Sun, Mar 29, 2020 at 11:22 PM Vitaliy Semochkin 
wrote:

> Hello Yun,
>
> I see this error reported by:
> *org.apache.flink.runtime.webmonitor.WebMonitorUtils*  - *JobManager log
> files are unavailable in the web dashboard. Log file location not found in
> environment variable 'log.file' or configuration key 'Key: 'web.log.path' ,
> default: null (fallback keys: [{key=jobmanager.web.log.path,
> isDeprecated=true}])'.*
>
> I wonder where the JobManager files are stored in case running on a YARN
> cluster?
> Are these logs same to those I get via yarn logs -applicationId?
>
> Regards,
> Vitaliy
>
>
>
> On Sun, Mar 29, 2020 at 8:24 PM Yun Tang  wrote:
>
>> Hi Vitaliy
>>
>> Property of 'log.file' would be configured if you have uploaded
>> 'logback.xml' or 'log4j.properties' [1].
>> The file would contain logs of job manager or task manager which is
>> decided by the component itself. And as you can see, this is only a local
>> file path, I am afraid this cannot understand hdfs paths.
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/ae3b0ff80b93a83a358ab474060473863d2c30d6/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java#L420
>>
>> Best
>> Yun Tang
>> --
>> *From:* Vitaliy Semochkin 
>> *Sent:* Sunday, March 29, 2020 4:32
>> *To:* user 
>> *Subject:* Log file environment variable 'log.file' is not set.
>>
>> Hi,
>>
>> When I launch Flink Application Cluster I keep getting a message
>> " Log file environment variable 'log.file' is not set."
>>
>> I use console logging via log4j
>> and I read logs via yarn logs -applicationId 
>>
>> What's the purpose of log.file property?
>> What this file will contain and on which host should I search for the log?
>> Does this property understands hdfs paths?
>>
>> Regards,
>> Vitaliy
>>
>


Re: End to End Latency Tracking in flink

2020-03-30 Thread Oscar Westra van Holthe - Kind
On Mon, 30 Mar 2020 at 05:08, Lu Niu  wrote:

> $current_processing - $event_time works for event time. How about
> processing time? Is there a good way to measure the latency?
>

To measure latency you'll need some way to determine the time spent between
the start and end of your pipeline.

To measure latency when using processing time, you'll need to partially use
ingestion time. That is, you'll need to add the 'current' processing time
as soon as messages are ingested.

With it, you can then use the $current_processing - $ingest_time solution
that was already mentioned.

Kind regards,
Oscar

-- 
Oscar Westra van Holthe - Kind


Re: Flink YARN app terminated before the client receives the result

2020-03-30 Thread Aljoscha Krettek
I think we have to take a step back here. For per-job (YARN) mode, the 
general problem is that there are two systems that can do shutdown (and 
other things) and two clients. There is YARN and there is Flink, and 
Flink is YARN inside YARN, in a way. The solution, I think, is that 
cancellation for YARN mode should go though YARN, not through Flink. 
Then there can be no races or other issues with the cluster shutting 
down before it has a chance to send a response.


Btw, the same goes for "attached mode" where a client waits for job 
completion. IMO, this should also go through YARN and not the Flink REST 
client.


What do you think?

Best,
Aljoscha

On 20.03.20 15:15, Till Rohrmann wrote:

Yes you are right that `thenAcceptAsync` only breaks the control flow but
it does not guarantee that the `RestServer` has actually sent the response
to the client. Maybe we also need something similar to FLINK-10309 [1]. The
problem I see with this approach is that it makes all RestHandlers stateful.

[1] https://issues.apache.org/jira/browse/FLINK-10309

Cheers,
Till

On Fri, Mar 20, 2020 at 2:26 PM DONG, Weike  wrote:


Hi Tison & Till,

Changing *thenAccept *into *thenAcceptAsync *in the
MiniDispatcher#cancelJob does not help to solve the problem in my
environment. However, I have found that adding a* Thread.sleep(2000) *before
the return of JobCancellationHandler#handleRequest solved the problem (at
least the symptom goes away). As this is only a dirty hack, I will try to
get a more decent solution to this problem.

Sincerely,
Weike

On Tue, Mar 17, 2020 at 11:11 PM tison  wrote:


JIRA created as https://jira.apache.org/jira/browse/FLINK-16637

Best,
tison.


Till Rohrmann  于2020年3月17日周二 下午5:57写道:


  @Tison could you create an issue to track the problem. Please also link
the uploaded log file for further debugging.

I think the reason why it worked in Flink 1.9 could have been that we
had a async callback in the longer chain which broke the flow of execution
and allowed to send the response. This is no longer the case. As an easy
fix one could change thenAccept into thenAcceptAsync in the
MiniDispatcher#cancelJob. But this only solves symptoms. Maybe we should
think about allowing not only StatusHandler to close asynchronously. At the
moment we say that all other handler shut down immediately (see
AbstractHandler#closeHandlerAsync). But the problem with this change would
be that all handler would become stateful because they would need to
remember whether a request is currently ongoing or not.

Cheers,
Till

On Tue, Mar 17, 2020 at 9:01 AM DONG, Weike 
wrote:


Hi Tison & Till and all,

I have uploaded the client, taskmanager and jobmanager log to Gist (
https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f),
and I
can reproduce this bug every time when trying to cancel Flink 1.10 jobs
on
YARN.

Besides, in earlier Flink versions like 1.9, the REST API for
*cancelling
job with a savepoint *sometimes throws exceptions to the client side
due to
early shutdown of the server, even though the savepoint was successfully
completed by reviewing the log, however when using the newly introduced
*stop* API, that bug disappeared, however, *cancel* API seems to be
buggy
now.

Best,
Weike

On Tue, Mar 17, 2020 at 10:17 AM tison  wrote:


edit: previously after the cancellation we have a longer call chain to
#jobReachedGloballyTerminalState which does the archive job & JM

graceful

showdown, which might take some time so that ...

Best,
tison.


tison  于2020年3月17日周二 上午10:13写道:


Hi Weike & Till,

I agree with Till and it is also the analysis from my side. However,

it

seems even if we don't have FLINK-15116, it is still possible that we
complete the cancel future but the cluster got shutdown before it

properly

delivered the response.

There is one thing strange that this behavior almost reproducible, it
should be a possible order but not always. Maybe previous we have to
firstly cancel the job which has a long call chain so that it

happens we

have enough time to delivered the response.

But the resolution looks like we introduce some
synchronization/finalization logics that clear these outstanding

future

with best effort before the cluster(RestServer) down.

Best,
tison.


Till Rohrmann  于2020年3月17日周二 上午4:12写道:


Hi Weike,

could you share the complete logs with us? Attachments are being
filtered out by the Apache mail server but it works if you upload

the logs

somewhere (e.g. https://gist.github.com/) and then share the link

with

us. Ideally you run the cluster with DEBUG log settings.

I assume that you are running Flink 1.10, right?

My suspicion is that this behaviour has been introduced with

FLINK-15116

[1]. It looks as if we complete the shutdown future in
MiniDispatcher#cancelJob before we return the response to the
RestClusterClient. My guess is that this triggers the shutdown of

the

RestServer which then is not able to serve the response to the

client. I'm

pulling in Aljoscha and Tison 

flink 1.10 support LONG as watermark?

2020-03-30 Thread jingjing bai
Hi:
flinkers!

I try to upgrade our production to 1.10V from 1.9 which is our current
product version.
in our case,the event_time is Long ,and we had implement this function
which support long type  as a watermark in our inner version, it is a
different from the official version on 1.10.
on 1.10 version, flink had add watermark definition and I had drop our
implement.
and i encounter this problem too ,

how can I do in new version,  if is ,  I 'm not to migrate our inner
implement to new version.


Re: End to End Latency Tracking in flink

2020-03-30 Thread Guanghui Zhang
Hi.
At flink source connector, you can send $source_current_time - $event_time
metric.
In the meantime, at flink sink connector, you can send $sink_current_time -
$event_time metric.
Then you use  $sink_current_time - $event_time - ($source_current_time -
$event_time) = $sink_current_time - $source_current_time as the latency of
end to end。

Oscar Westra van Holthe - Kind  于2020年3月30日周一
下午5:15写道:

> On Mon, 30 Mar 2020 at 05:08, Lu Niu  wrote:
>
>> $current_processing - $event_time works for event time. How about
>> processing time? Is there a good way to measure the latency?
>>
>
> To measure latency you'll need some way to determine the time spent
> between the start and end of your pipeline.
>
> To measure latency when using processing time, you'll need to partially
> use ingestion time. That is, you'll need to add the 'current' processing
> time as soon as messages are ingested.
>
> With it, you can then use the $current_processing - $ingest_time solution
> that was already mentioned.
>
> Kind regards,
> Oscar
>
> --
> Oscar Westra van Holthe - Kind
>


Re: flink 1.10 support LONG as watermark?

2020-03-30 Thread Jark Wu
Hi Jingjing,

Event time field must be a TIMESTAMP(3) type. You can convert your Long
type value into TIMESTAMP(3) using user-defined function.
I'm sorry that Flink doesn't provide built-in function for this purpose,
but will have one soon.

For example:
CREATE TABLE myTable (
 log_ts bigint,
 event_time AS my_func(log_ts),
 WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND
) WITH (
 ...
);

Here my_func is a UDF which converts BIGINT into TIMESTAMP(3).

Best,
Jark

On Mon, 30 Mar 2020 at 18:16, jingjing bai 
wrote:

>
> Hi:
> flinkers!
>
> I try to upgrade our production to 1.10V from 1.9 which is our current
> product version.
> in our case,the event_time is Long ,and we had implement this function
> which support long type  as a watermark in our inner version, it is a
> different from the official version on 1.10.
> on 1.10 version, flink had add watermark definition and I had drop our
> implement.
> and i encounter this problem too ,
>
> how can I do in new version,  if is ,  I 'm not to migrate our inner
> implement to new version.
>
>
>


Run several jobs in parallel in same EMR cluster?

2020-03-30 Thread Antonio Martínez Carratalá
Hello

I'm running Flink over Amazon EMR and I'm trying to send several different
batch jobs to the cluster after creating it.

This is my cluster creation code:

StepConfig copyJarStep = new StepConfig()
.withName("copy-jar-step")
.withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
.withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
.withArgs("bash", "-c", "aws s3 cp s3://" + bucketName +
"/lib/flink-jobs.jar /home/hadoop/flink-jobs.jar"));

List stepConfigs = new ArrayList<>();
stepConfigs.add(copyJarStep);

Application flink = new Application().withName("Flink");

Configuration flinkConfiguration = new Configuration()
 .withClassification("flink-conf")
.addPropertiesEntry("jobmanager.heap.size", "2048m")
.addPropertiesEntry("taskmanager.heap.size",  "2048m")

RunJobFlowRequest request = new RunJobFlowRequest()
.withName("cluster-" + executionKey)
.withReleaseLabel("emr-5.26.0")
.withApplications(flink)
.withConfigurations(flinkConfiguration)
.withServiceRole("EMR_DefaultRole")
.withJobFlowRole("EMR_EC2_DefaultRole")
.withLogUri(getWorkPath() + "logs")
.withInstances(new JobFlowInstancesConfig()
.withEc2SubnetId("subnetid")
.withInstanceCount(2) // 1 for task manager + 1 for job manager
.withKeepJobFlowAliveWhenNoSteps(true)
.withMasterInstanceType("m4.large")
.withSlaveInstanceType("m4.large"))
.withSteps(stepConfigs);

RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);


And this is how I add the jobs:
-
StepConfig runJobStep = new StepConfig()
.withName("run-job-step")
.withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
.withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
.withArgs("bash", "-c", "flink run -m yarn-cluster"
+ " --parallelism " + parallelism
+ " --class " + jobClass.getCanonicalName()
+ " /home/hadoop/flink-jobs.jar "
+ jobArguments));

AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
.withJobFlowId(clusterId)
.withSteps(runJobStep);

AddJobFlowStepsResult result =
amazonClient.getEmrClient().addJobFlowSteps(request);
-

And these are my jobs:

- Job1 - parallelism 1
- Job2 - parallelism 1
- Job3 - parallelism 2

I'm using m4.large machines as slave so I have 2 cores in it, and I was
expecting that Job1 and Job2 were running in parallel and then Job3 when
Job1 and Job2 finish, but what I see is that Job2 is waiting (Pending
status) for Job1 to finish before start. I see only one task manager is
created for Job1, when finishes another one is created for Job2, and then 2
are created for Job3

Since I have 2 cores available why is it not running Job2 in the other
instead of wait? is there any way to configure it?

Thanks


Re: flink 1.10 support LONG as watermark?

2020-03-30 Thread jingjing bai
Hi jarkWu!

Is there a FLIP to do so?  I'm very glad to learn from idea.


Best,
jing

Jark Wu  于2020年3月30日周一 下午6:52写道:

> Hi Jingjing,
>
> Event time field must be a TIMESTAMP(3) type. You can convert your Long
> type value into TIMESTAMP(3) using user-defined function.
> I'm sorry that Flink doesn't provide built-in function for this purpose,
> but will have one soon.
>
> For example:
> CREATE TABLE myTable (
>  log_ts bigint,
>  event_time AS my_func(log_ts),
>  WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND
> ) WITH (
>  ...
> );
>
> Here my_func is a UDF which converts BIGINT into TIMESTAMP(3).
>
> Best,
> Jark
>
> On Mon, 30 Mar 2020 at 18:16, jingjing bai 
> wrote:
>
>>
>> Hi:
>> flinkers!
>>
>> I try to upgrade our production to 1.10V from 1.9 which is our current
>> product version.
>> in our case,the event_time is Long ,and we had implement this function
>> which support long type  as a watermark in our inner version, it is a
>> different from the official version on 1.10.
>> on 1.10 version, flink had add watermark definition and I had drop our
>> implement.
>> and i encounter this problem too ,
>>
>> how can I do in new version,  if is ,  I 'm not to migrate our inner
>> implement to new version.
>>
>>
>>


[ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

2020-03-30 Thread Jeff Zhang
Hi Folks,

I am very excited to announce the integration work of flink on apache
zeppelin notebook is completed. You can now run flink jobs via datastream
api, table api, sql, pyflink in apache apache zeppelin notebook. Download
it here http://zeppelin.apache.org/download.html),

Here's some highlights of this work

1. Support 3 kind of execution mode: local, remote, yarn
2. Support multiple languages  in one flink session: scala, python, sql
3. Support hive connector (reading from hive and writing to hive)
4. Dependency management
5. UDF support (scala, pyflink)
6. Support both batch sql and streaming sql

For more details and usage instructions, you can refer following 4 blogs

1) Get started https://link.medium.com/oppqD6dIg5
 2) Batch https://link.medium.com/3qumbwRIg5
 3) Streaming https://
link.medium.com/RBHa2lTIg5  4) Advanced
usage https://link.medium.com/CAekyoXIg5 

Welcome to use flink on zeppelin and give feedback and comments.

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

2020-03-30 Thread Till Rohrmann
This is great news Jeff! Thanks a lot for sharing it with the community.
Looking forward trying Flink on Zeppelin out :-)

Cheers,
Till

On Mon, Mar 30, 2020 at 2:47 PM Jeff Zhang  wrote:

> Hi Folks,
>
> I am very excited to announce the integration work of flink on apache
> zeppelin notebook is completed. You can now run flink jobs via datastream
> api, table api, sql, pyflink in apache apache zeppelin notebook. Download
> it here http://zeppelin.apache.org/download.html),
>
> Here's some highlights of this work
>
> 1. Support 3 kind of execution mode: local, remote, yarn
> 2. Support multiple languages  in one flink session: scala, python, sql
> 3. Support hive connector (reading from hive and writing to hive)
> 4. Dependency management
> 5. UDF support (scala, pyflink)
> 6. Support both batch sql and streaming sql
>
> For more details and usage instructions, you can refer following 4 blogs
>
> 1) Get started https://link.medium.com/oppqD6dIg5
>  2) Batch https://
> link.medium.com/3qumbwRIg5  3) Streaming
> https://link.medium.com/RBHa2lTIg5  4)
> Advanced usage https://link.medium.com/CAekyoXIg5
> 
>
> Welcome to use flink on zeppelin and give feedback and comments.
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Run several jobs in parallel in same EMR cluster?

2020-03-30 Thread Gary Yao
Can you try to set config option taskmanager.numberOfTaskSlots to 2? By
default the TMs only offer one slot [1] independent from the number of CPU
cores.

Best,
Gary

[1]
https://github.com/apache/flink/blob/da3082764117841d885f41c645961f8993a331a0/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java#L197-L199

On Mon, Mar 30, 2020 at 1:22 PM Antonio Martínez Carratalá <
amarti...@alto-analytics.com> wrote:

> Hello
>
> I'm running Flink over Amazon EMR and I'm trying to send several different
> batch jobs to the cluster after creating it.
>
> This is my cluster creation code:
> 
> StepConfig copyJarStep = new StepConfig()
> .withName("copy-jar-step")
> .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
> .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
> .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName +
> "/lib/flink-jobs.jar /home/hadoop/flink-jobs.jar"));
>
> List stepConfigs = new ArrayList<>();
> stepConfigs.add(copyJarStep);
>
> Application flink = new Application().withName("Flink");
>
> Configuration flinkConfiguration = new Configuration()
>  .withClassification("flink-conf")
> .addPropertiesEntry("jobmanager.heap.size", "2048m")
> .addPropertiesEntry("taskmanager.heap.size",  "2048m")
>
> RunJobFlowRequest request = new RunJobFlowRequest()
> .withName("cluster-" + executionKey)
> .withReleaseLabel("emr-5.26.0")
> .withApplications(flink)
> .withConfigurations(flinkConfiguration)
> .withServiceRole("EMR_DefaultRole")
> .withJobFlowRole("EMR_EC2_DefaultRole")
> .withLogUri(getWorkPath() + "logs")
> .withInstances(new JobFlowInstancesConfig()
> .withEc2SubnetId("subnetid")
> .withInstanceCount(2) // 1 for task manager + 1 for job manager
> .withKeepJobFlowAliveWhenNoSteps(true)
> .withMasterInstanceType("m4.large")
> .withSlaveInstanceType("m4.large"))
> .withSteps(stepConfigs);
>
> RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
>
> 
>
> And this is how I add the jobs:
>
> -
> StepConfig runJobStep = new StepConfig()
> .withName("run-job-step")
> .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
> .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
> .withArgs("bash", "-c", "flink run -m yarn-cluster"
> + " --parallelism " + parallelism
> + " --class " + jobClass.getCanonicalName()
> + " /home/hadoop/flink-jobs.jar "
> + jobArguments));
>
> AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
> .withJobFlowId(clusterId)
> .withSteps(runJobStep);
>
> AddJobFlowStepsResult result =
> amazonClient.getEmrClient().addJobFlowSteps(request);
>
> -
>
> And these are my jobs:
>
> - Job1 - parallelism 1
> - Job2 - parallelism 1
> - Job3 - parallelism 2
>
> I'm using m4.large machines as slave so I have 2 cores in it, and I was
> expecting that Job1 and Job2 were running in parallel and then Job3 when
> Job1 and Job2 finish, but what I see is that Job2 is waiting (Pending
> status) for Job1 to finish before start. I see only one task manager is
> created for Job1, when finishes another one is created for Job2, and then 2
> are created for Job3
>
> Since I have 2 cores available why is it not running Job2 in the other
> instead of wait? is there any way to configure it?
>
> Thanks
>
>
>
>


Re: Log file environment variable 'log.file' is not set.

2020-03-30 Thread Vitaliy Semochkin
Hello Robert,
Thank you for quick response!
Indeed logs says the hadoop version is 2.4.1 this is probably because of
https://github.com/apache/flink/blob/b17a597dec80e590db2beedda446aa3cae9920dd/pom.xml#L96
How can I make 1.10 to work with my current hadoop version?

Regarding flink reporting in logs its 1.7.0
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Starting
YarnJobClusterEntrypoint (Version: 1.7.0
while I'm using 1.10 and this is application cluster (everything is bundled
and we don't have session cluster running).
Here are the whole dependencies list:
mvn dependency:tree | grep flink | cut -d'-' -f2-
 org.apache.flink:flink-yarn_2.11:jar:1.10.0:runtime
 org.apache.flink:flink-clients_2.11:jar:1.10.0:compile
 org.apache.flink:flink-optimizer_2.11:jar:1.10.0:compile
 org.apache.flink:flink-shaded-hadoop-2:jar:2.4.1-9.0:runtime
 org.apache.flink:force-shading:jar:1.10.0:compile
 org.apache.flink:flink-runtime_2.11:jar:1.10.0:runtime
 org.apache.flink:flink-core:jar:1.10.0:compile
 org.apache.flink:flink-annotations:jar:1.10.0:compile
 org.apache.flink:flink-metrics-core:jar:1.10.0:compile
 org.apache.flink:flink-java:jar:1.10.0:compile
 org.apache.flink:flink-queryable-state-client-java:jar:1.10.0:runtime
 org.apache.flink:flink-hadoop-fs:jar:1.10.0:runtime
 org.apache.flink:flink-shaded-netty:jar:4.1.39.Final-9.0:compile
 org.apache.flink:flink-shaded-guava:jar:18.0-9.0:compile
 org.apache.flink:flink-shaded-asm-7:jar:7.1-9.0:compile
 org.apache.flink:flink-shaded-jackson:jar:2.10.1-9.0:compile
 org.apache.flink:flink-jdbc_2.11:jar:1.10.0:compile
 org.apache.flink:flink-hbase_2.11:jar:1.10.0:compile
 org.apache.flink:flink-runtime-web_2.11:jar:1.10.0:compile
As you can see all flink related libs are 1.10.

Can you please tell which class in flinks identifies the version(I'll try
to debug it locally)?

Regards,
Vitaliy


On Mon, Mar 30, 2020 at 5:10 PM Robert Metzger  wrote:

> Hey Vitaliy,
> is it okay for you if we keep the discussion on the list, so that others
> can chime in to help, and that Google can index the conversation, in case
> somebody else has a similar problem?
>
> I just checked, and Flink on YARN in Flink 1.10 does set the
> property correctly. Maybe in Flink 1.7, accessing the logs in the web ui
> was not yet supported.
>
> You said in your email, that you are using Flink 1.10, however, your logs
> state that you are running Flink 1.7.0.
> It also seems that you have the Hadoop 2.4.1 dependencies of Flink, but
> your Hadoop environment is Hadoop 2.7.3. I believe this error is caused by
> that version mismatch:
>
> Caused by: java.lang.IllegalAccessError: tried to access method
>> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
>> from class
>> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
>
>
>
>
> On Mon, Mar 30, 2020 at 1:58 PM Vitaliy Semochkin 
> wrote:
>
>> Hello Robert,
>> >Where exactly are you seeing the "Log file environment variable
>> 'log.file' is not set." message?
>> I get this message when I check yarn logs.
>>
>> >Can you post some context around it? (is this shown from the command
>> line? what are the arguments? is it shown in a file?
>> I'm creating an application cluster from a java application using flink
>> 1.10. I didn't have this issue with 1.8.1 version,
>> however when I upgraded configuration slightly changed, e.g.
>> ClusterSpecification taskManagerMemoryMB is now ignored, and should be set
>> via fink Configuration.
>> (though it's value still validated, but no longer used after that).
>> My main issue, is that Application seems to fail to start properly,  it
>> seems that JobMaster fails to connect to ResourceManager, but I can't
>> figure out why.
>> The yarn log  is attached.
>>
>> I'll appreciate if you tell me to which direction I should dig.
>>
>> Regards,
>> Vitaliy
>>
>>
>> On Mon, Mar 30, 2020 at 12:00 PM Robert Metzger 
>> wrote:
>>
>>> Hey,
>>> which Flink version are you using?
>>>
>>> Where exactly are you seeing the "Log file environment variable
>>> 'log.file' is not set." message? Can you post some context around it? (is
>>> this shown from the command line? what are the arguments? is it shown in a
>>> file?
>>>
>>> Usually, the "log.file" property is used to pass the name of the log
>>> file into the log4j configuration. If this property is not set, I have to
>>> assume that you are using modified or custom scripts, or you are executing
>>> Flink in an environment that fails to set the property.
>>>
>>> When running Flink on YARN, the JobManager logs are stored on the
>>> machine running the JobManager. The logs accessible through "yarn logs" are
>>> the same as you would see in the JM interface.
>>>
>>> Best,
>>> Robert
>>>
>>>
>>>
>>> On Sun, Mar 29, 2020 at 11:22 PM Vitaliy Semochkin 
>>> wrote:
>>>
 Hello Yun,

 I see this error reported by:
 *org.apache.flink.runtime.webmonitor.WebMonitorUtils*  - *JobManager
 log files 

flink-shaded-hadoop2 for flink 1.10

2020-03-30 Thread Vitaliy Semochkin
Hi,

I can not find flink-shaded-hadoop2 for flink 1.10 in maven repositories.
According to maven central
https://search.maven.org/artifact/org.apache.flink/flink-shaded-hadoop
The latest released version was was 1.8.3

Is it going to be leased soon or one should build it for himself or i'm
searching in the wrong place?

Regards,
Vitaliy


Re: Lack of KeyedBroadcastStateBootstrapFunction

2020-03-30 Thread Mark Niehe
Hi Gordan and Seth,

Thanks for explanation and opening up the ticket. I'll add some details in
the ticket to explain what we're trying to do which will hopefully add some
context.

-- 

Mark Niehe ·  Software Engineer
Integrations
  ·  Blog
  ·  We're
Hiring! 

On Mon, Mar 30, 2020 at 1:04 AM Tzu-Li (Gordon) Tai 
wrote:

> It seems like Seth's reply didn't make it to the mailing lists somehow.
> Forwarding his reply below:
>
> -- Forwarded message -
> From: Seth Wiesman 
> Date: Thu, Mar 26, 2020 at 5:16 AM
> Subject: Re: Lack of KeyedBroadcastStateBootstrapFunction
> To: Dawid Wysakowicz 
> Cc: , Tzu-Li (Gordon) Tai 
>
>
> As Dawid mentioned, you can implement your own operator using the
> transform method to do this yourself. Unfortunately, that is fairly low
> level and would require you to understand some flink amount internals.
>
> The real problem is that the state processor api does not support two
> input operators. We originally skipped that because there were a number of
> open questions about how best to do it and it wasn't clear that it would be
> a necessary feature. Typically, flink users use two input operators to do
> some sort of join. And when bootstrapping state, you typically only want to
> pre-fill one side of that join. KeyedBroadcastState is clearly a good
> counter-argument to that.
>
> I've opened a ticket for the feature if you would like to comment there.
>
> https://issues.apache.org/jira/browse/FLINK-16784
>
> On Tue, Mar 24, 2020 at 9:17 AM Dawid Wysakowicz 
> wrote:
>
>> Hi,
>>
>> I am not very familiar with the State Processor API, but from a brief
>> look at it, I think you are right. I think the State Processor API does not
>> support mixing different kinds of states in a single operator for now. At
>> least not in a nice way. Probably you could implement the
>> KeyedBroadcastStateBootstrapFunction yourself and us it with
>> KeyedOperatorTransformation#transform(org.apache.flink.state.api.SavepointWriterOperatorFactory).
>> I understand this is probably not the easiest task.
>>
>> I am not aware if there are plans to support that out of the box, but I
>> cc'ed Gordon and Seth who if I remember correctly worked on that API. I
>> hope they might give you some more insights.
>>
>> Best,
>>
>> Dawid
>>  On 23/03/2020 17:36, Mark Niehe wrote:
>>
>> Hey all,
>>
>> I have another question about the State Processor API. I can't seem to
>> find a way to create a KeyedBroadcastStateBootstrapFunction operator. The
>> two options currently available to bootstrap a savepoint with state are
>> KeyedStateBootstrapFunction and BroadcastStateBootstrapFunction. Because
>> these are the only two options, it's not possible to bootstrap both keyed
>> and broadcast state for the same operator. Are there any plans to add that
>> functionality or did I miss it entirely when going through the API docs?
>>
>> Thanks,
>> --
>> 
>> Mark Niehe ·  Software Engineer
>> Integrations
>>   ·
>> Blog 
>>   ·  We're Hiring!
>> 
>>
>>


Re: Lack of KeyedBroadcastStateBootstrapFunction

2020-03-30 Thread Tzu-Li (Gordon) Tai
Thanks! Looking forward to that.

On Tue, Mar 31, 2020 at 1:02 AM Mark Niehe  wrote:

> Hi Gordan and Seth,
>
> Thanks for explanation and opening up the ticket. I'll add some details in
> the ticket to explain what we're trying to do which will hopefully add some
> context.
>
> --
> 
> Mark Niehe ·  Software Engineer
> Integrations
>   ·
> Blog   ·  
> We're
> Hiring! 
>
> On Mon, Mar 30, 2020 at 1:04 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> It seems like Seth's reply didn't make it to the mailing lists somehow.
>> Forwarding his reply below:
>>
>> -- Forwarded message -
>> From: Seth Wiesman 
>> Date: Thu, Mar 26, 2020 at 5:16 AM
>> Subject: Re: Lack of KeyedBroadcastStateBootstrapFunction
>> To: Dawid Wysakowicz 
>> Cc: , Tzu-Li (Gordon) Tai 
>>
>>
>> As Dawid mentioned, you can implement your own operator using the
>> transform method to do this yourself. Unfortunately, that is fairly low
>> level and would require you to understand some flink amount internals.
>>
>> The real problem is that the state processor api does not support two
>> input operators. We originally skipped that because there were a number of
>> open questions about how best to do it and it wasn't clear that it would be
>> a necessary feature. Typically, flink users use two input operators to do
>> some sort of join. And when bootstrapping state, you typically only want to
>> pre-fill one side of that join. KeyedBroadcastState is clearly a good
>> counter-argument to that.
>>
>> I've opened a ticket for the feature if you would like to comment there.
>>
>> https://issues.apache.org/jira/browse/FLINK-16784
>>
>> On Tue, Mar 24, 2020 at 9:17 AM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi,
>>>
>>> I am not very familiar with the State Processor API, but from a brief
>>> look at it, I think you are right. I think the State Processor API does not
>>> support mixing different kinds of states in a single operator for now. At
>>> least not in a nice way. Probably you could implement the
>>> KeyedBroadcastStateBootstrapFunction yourself and us it with
>>> KeyedOperatorTransformation#transform(org.apache.flink.state.api.SavepointWriterOperatorFactory).
>>> I understand this is probably not the easiest task.
>>>
>>> I am not aware if there are plans to support that out of the box, but I
>>> cc'ed Gordon and Seth who if I remember correctly worked on that API. I
>>> hope they might give you some more insights.
>>>
>>> Best,
>>>
>>> Dawid
>>>  On 23/03/2020 17:36, Mark Niehe wrote:
>>>
>>> Hey all,
>>>
>>> I have another question about the State Processor API. I can't seem to
>>> find a way to create a KeyedBroadcastStateBootstrapFunction operator. The
>>> two options currently available to bootstrap a savepoint with state are
>>> KeyedStateBootstrapFunction and BroadcastStateBootstrapFunction. Because
>>> these are the only two options, it's not possible to bootstrap both keyed
>>> and broadcast state for the same operator. Are there any plans to add that
>>> functionality or did I miss it entirely when going through the API docs?
>>>
>>> Thanks,
>>> --
>>> 
>>> Mark Niehe ·  Software Engineer
>>> Integrations
>>>   ·
>>> Blog 
>>>   ·  We're Hiring!
>>> 
>>>
>>>


Re: flink-shaded-hadoop2 for flink 1.10

2020-03-30 Thread Sivaprasanna
Hi Vitaliy,

Check for "flink-shaded-hadoop-2". It has dependencies with various hadoop
versions.
https://search.maven.org/artifact/org.apache.flink/flink-shaded-hadoop-2

On Mon, Mar 30, 2020 at 10:13 PM Vitaliy Semochkin 
wrote:

> Hi,
>
> I can not find flink-shaded-hadoop2 for flink 1.10 in maven repositories.
> According to maven central
> https://search.maven.org/artifact/org.apache.flink/flink-shaded-hadoop
> The latest released version was was 1.8.3
>
> Is it going to be leased soon or one should build it for himself or i'm
> searching in the wrong place?
>
> Regards,
> Vitaliy
>


Re: Issue with single job yarn flink cluster HA

2020-03-30 Thread Dinesh J
HI Yang,
Thanks for the clarification and suggestion. But my problem was that
recovery never happens and the message "leader election ongoing" is what
the message displayed forever.
Do you think increasing akka.ask.timeout and akka.tcp.timeout will help in
case of a heavy/highload cluster as this issue happens mainly during heavy
load in cluster?

Best,
Dinesh

On Mon, Mar 30, 2020 at 2:29 PM Yang Wang  wrote:

> Hi Dinesh,
>
> First, i think the error message your provided is not a problem. It
> just indicates that the leader
> election is still ongoing. When it finished, the new leader will start the
> a new dispatcher to provide
> the webui and rest service.
>
> From your jobmanager logs "Connection refused: host1/ipaddress1:28681", we
> could know that
> the old jobmanager has failed. When a new jobmanager started, since the
> old jobmanager still
> hold the lock of leader latch. So Flink tries to connect with it. After it
> tries few times, since the old
> jobmanager zookeeper client do not update the leader latch, then the new
> jobmanager will elect
> successfully and be the active leader. It is just how the leader election
> works.
>
> In a nutshell, the root cause is old jobmanager crashed and it does not
> lose the leader immediately.
> It is the by-design behavior.
>
> If you really want to make the recovery faster, i think you could decrease
> "high-availability.zookeeper.client.connection-timeout"
> and "high-availability.zookeeper.client.session-timeout". Please keep in
> mind that too small value
> will also cause unexpected failover because of network problem.
>
>
> Best,
> Yang
>
> Dinesh J  于2020年3月25日周三 下午4:20写道:
>
>> Hi Andrey,
>> Yes . The job is not restarting sometimes after the current leader
>> failure.
>> Below is the message displayed when trying to reach the application
>> master url via yarn ui and message remains the same even if the yarn job is
>> running for 2 days.
>> During this time , even current yarn application attempt is not getting
>> failed and no containers are launched for jobmanager and taskmanager.
>>
>> *{"errors":["Service temporarily unavailable due to an ongoing leader
>> election. Please refresh."]}*
>>
>> Thanks,
>> Dinesh
>>
>> On Tue, Mar 24, 2020 at 6:45 PM Andrey Zagrebin 
>> wrote:
>>
>>> Hi Dinesh,
>>>
>>> If the current leader crashes (e.g. due to network failures) then
>>> getting these messages do not look like a problem during the leader
>>> re-election.
>>> They look to me just as warnings that caused failover.
>>>
>>> Do you observe any problem with your application? Does the failover not
>>> work, e.g. no leader is elected or a job is not restarted after the current
>>> leader failure?
>>>
>>> Best,
>>> Andrey
>>>
>>> On Sun, Mar 22, 2020 at 11:14 AM Dinesh J  wrote:
>>>
 Attaching the job manager log for reference.

 2020-03-22 11:39:02,693 WARN
  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever  -
 Error while retrieving the leader gateway. Retrying to connect to
 akka.tcp://flink@host1:28681/user/dispatcher.
 2020-03-22 11:39:02,724 WARN
  akka.remote.transport.netty.NettyTransport- Remote
 connection to [null] failed with java.net.ConnectException: Connection
 refused: host1/ipaddress1:28681
 2020-03-22 11:39:02,724 WARN  akka.remote.ReliableDeliverySupervisor
  - Association with remote system
 [akka.tcp://flink@host1:28681] has failed, address is now gated for
 [50] ms. Reason: [Association failed with [akka.tcp://flink@host1:28681]]
 Caused by: [Connection refused: host1/ipaddress1:28681]
 2020-03-22 11:39:02,791 WARN
  akka.remote.transport.netty.NettyTransport- Remote
 connection to [null] failed with java.net.ConnectException: Connection
 refused: host1/ipaddress1:28681
 2020-03-22 11:39:02,792 WARN  akka.remote.ReliableDeliverySupervisor
  - Association with remote system
 [akka.tcp://flink@host1:28681] has failed, address is now gated for
 [50] ms. Reason: [Association failed with [akka.tcp://flink@host1:28681]]
 Caused by: [Connection refused: host1/ipaddress1:28681]
 2020-03-22 11:39:02,861 WARN
  akka.remote.transport.netty.NettyTransport- Remote
 connection to [null] failed with java.net.ConnectException: Connection
 refused: host1/ipaddress1:28681
 2020-03-22 11:39:02,861 WARN  akka.remote.ReliableDeliverySupervisor
  - Association with remote system
 [akka.tcp://flink@host1:28681] has failed, address is now gated for
 [50] ms. Reason: [Association failed with [akka.tcp://flink@host1:28681]]
 Caused by: [Connection refused: host1/ipaddress1:28681]
 2020-03-22 11:39:02,931 WARN
  akka.remote.transport.netty.NettyTransport- Remote
 connection to [null] failed with java.net.ConnectException: Connection
 refused: host1/ipaddre

How to enforce ACLs on Flink JobManager/ApplicationMaster URL on Yarn

2020-03-30 Thread Ethan Li
Hi Team,

I am evaluating Flink on yarn. I can submit a flink job to a secured Yarn
cluster and the job can run correctly. But flink jobmanager UI seems
accessibly by everyone. Is there anyway in Flink or Yarn to secure it with
ACLs?

Thanks,
Ethan


Re: How to enforce ACLs on Flink JobManager/ApplicationMaster URL on Yarn

2020-03-30 Thread Aaron Langford
I'd be curious to see how others have done this, but our setup restricts
network access to machines in the YARN cluster to a jump box. Access to
Flink job manager is limited to whoever can ssh to that box, and that is
controlled with an Ansible playbook. Additionally, we have a list of users
specific to the cluster who can ssh to the machines in the YARN cluster
(also managed with Ansible). So the allowed users are the intersection of
the jump server ACL and the YARN cluster ACL. Web access happens by using a
local socks proxy along with the Foxy Proxy browser plugin. It's definitely
pretty crude and doesn't scale super well as more teams need varying access
policies to different YARN clusters/jobs, but it is satisfying our needs
for now. One big simplifying assumption is that we don't support shared
clusters. Amazon's EMR service allows teams to spin up clusters really
easily, so we can get away with saying that the machine network rules can
map to the actual access control rules any given job/team might need.

Aaron

On Mon, Mar 30, 2020 at 12:53 PM Ethan Li  wrote:

> Hi Team,
>
> I am evaluating Flink on yarn. I can submit a flink job to a secured Yarn
> cluster and the job can run correctly. But flink jobmanager UI seems
> accessibly by everyone. Is there anyway in Flink or Yarn to secure it with
> ACLs?
>
> Thanks,
> Ethan
>


Re: flink-shaded-hadoop2 for flink 1.10

2020-03-30 Thread Vitaliy Semochkin
Thank you very much Sivaprasanna!
It worked!

PS Does anyone know what's the difference between flink-shaded-hadoop2 and
flink-shaded-hadoop-?

Regards,
Vitaliy

On Mon, Mar 30, 2020 at 8:21 PM Sivaprasanna 
wrote:

> Hi Vitaliy,
>
> Check for "flink-shaded-hadoop-2". It has dependencies with various hadoop
> versions.
> https://search.maven.org/artifact/org.apache.flink/flink-shaded-hadoop-2
>
> On Mon, Mar 30, 2020 at 10:13 PM Vitaliy Semochkin 
> wrote:
>
>> Hi,
>>
>> I can not find flink-shaded-hadoop2 for flink 1.10 in maven repositories.
>> According to maven central
>> https://search.maven.org/artifact/org.apache.flink/flink-shaded-hadoop
>> The latest released version was was 1.8.3
>>
>> Is it going to be leased soon or one should build it for himself or i'm
>> searching in the wrong place?
>>
>> Regards,
>> Vitaliy
>>
>


Re: How to enforce ACLs on Flink JobManager/ApplicationMaster URL on Yarn

2020-03-30 Thread Ethan Li
Thanks for sharing! Aaron. Your comment is very helpful.

Our end goal is to support multi-tenancy and also share the yarn cluster
with MapReduce, Spark and other jobs. We probably need something else.

---

I wonder if there is any builtin functionalities in Flink or Yarn that
already supports ACL on JobManager/AM.

I also noticed is that flink doesn't have a way to set up ACLs for  yarn
containers so only the submitter or yarn admin can view the container logs.
A related PR (https://github.com/apache/flink/pull/8760) was closed due to
inactivity.  How do people deal with container ACLs? Do most of
flink-on-yarn dev/users use it without security? Or do we have to implement
our own solution outside of flink/yarn?  Please advise if anyone has any
idea about this. Thanks very much!

Best,
Ethan



On Mon, Mar 30, 2020 at 4:13 PM Aaron Langford 
wrote:

> I'd be curious to see how others have done this, but our setup restricts
> network access to machines in the YARN cluster to a jump box. Access to
> Flink job manager is limited to whoever can ssh to that box, and that is
> controlled with an Ansible playbook. Additionally, we have a list of users
> specific to the cluster who can ssh to the machines in the YARN cluster
> (also managed with Ansible). So the allowed users are the intersection of
> the jump server ACL and the YARN cluster ACL. Web access happens by using a
> local socks proxy along with the Foxy Proxy browser plugin. It's definitely
> pretty crude and doesn't scale super well as more teams need varying access
> policies to different YARN clusters/jobs, but it is satisfying our needs
> for now. One big simplifying assumption is that we don't support shared
> clusters. Amazon's EMR service allows teams to spin up clusters really
> easily, so we can get away with saying that the machine network rules can
> map to the actual access control rules any given job/team might need.
>
> Aaron
>
> On Mon, Mar 30, 2020 at 12:53 PM Ethan Li 
> wrote:
>
>> Hi Team,
>>
>> I am evaluating Flink on yarn. I can submit a flink job to a secured Yarn
>> cluster and the job can run correctly. But flink jobmanager UI seems
>> accessibly by everyone. Is there anyway in Flink or Yarn to secure it with
>> ACLs?
>>
>> Thanks,
>> Ethan
>>
>


Re: flink-shaded-hadoop2 for flink 1.10

2020-03-30 Thread Chesnay Schepler
flink-shaded-hadoop2 was released as part of Flink until 1.8 (hence why 
it followed the Flink version scheme), after which it was renamed to 
flink-shaded-hadoop-2 and is now being released separately from Flink as 
part of flink-shaded (a project that bundles various dependencies to be 
used by Flink).


As of right now there are hardly any practical differences between the two.

On 30/03/2020 23:31, Vitaliy Semochkin wrote:

Thank you very much Sivaprasanna!
It worked!

PS Does anyone know what's the difference between flink-shaded-hadoop2 
and flink-shaded-hadoop-?


Regards,
Vitaliy

On Mon, Mar 30, 2020 at 8:21 PM Sivaprasanna 
mailto:sivaprasanna...@gmail.com>> wrote:


Hi Vitaliy,

Check for "flink-shaded-hadoop-2". It has dependencies with
various hadoop

versions.https://search.maven.org/artifact/org.apache.flink/flink-shaded-hadoop-2

On Mon, Mar 30, 2020 at 10:13 PM Vitaliy Semochkin
mailto:vitaliy...@gmail.com>> wrote:

Hi,

I can not find flink-shaded-hadoop2 for flink 1.10 in maven
repositories.
According to maven central
https://search.maven.org/artifact/org.apache.flink/flink-shaded-hadoop
The latest released version was was 1.8.3

Is it going to be leased soon or one should build it for
himself or i'm searching in the wrong place?

Regards,
Vitaliy





Complex graph-based sessionization (potential use for stateful functions)

2020-03-30 Thread Krzysztof Zarzycki
Hi!  Interesting problem to solve ahead :)
I need to implement a streaming sessionization algorithm (split stream of
events into groups of correlated events). It's pretty non-standard as we
DON'T have a key like user id which separates the stream into substreams
which we just need to chunk based on time.
Instead and simplifying a lot, our events bear tuples, that I compare to
graph edges, e.g.:
event 1: A -> B
event 2: B -> C
event 3: D -> E
event 4: D -> F
event 5: G -> F
I need to group them into subgroups reachable by following these edges from
some specific nodes. E.g. here:
{ A->B, B->C}
{ D->E, D->F}
{ G->F }
(note: I need to group the events, which are represented by edges here, not
the nodes).
As far as I understand, to solve this problem I need to leverage feedback
loops/iterations feature in Flink (Generally I believe I need to apply a
Bulk Synchronous Processing approach).

Does anyone have seen this kind of sessionization implemented in the wild?
Would you suggest implementing such an algorithm using *stateful functions*?
(AFAIK, they use feedback loops underneath). Can you suggest how would
these be used here?
I know there are some problems with checkpointing when using iterations,
does it mean the implementation may experience data loss on stops?

Side comment: I'm not sure which graph algorithm derivative needs to be
applied here, but the candidate is transitive closure.

Thanks for joining the discussion!
Krzysztof


some subtask taking too long

2020-03-30 Thread Fanbin Bu
Hi,

I m running flink 1.9 on EMR using flink sql blink planner reading and
writing to JDBC input/output. my sql is just a listagg over window for the
last 7 days. However, i notice that there are one or two subtasks that take
too long to finish. In this thread
http://mail-archives.apache.org/mod_mbox/flink-user/201901.mbox/%3CCAEv5b0yD+0WBXgAnfT0b=ZqLC8rPE9_izzE3g+9Vxw8oK9w2=a...@mail.gmail.com%3E,
that is a similar issue.

Any idea on how to debug this?

Thanks
Fanbin


Re: Issue with single job yarn flink cluster HA

2020-03-30 Thread Yang Wang
I think your problem is not about akka timeout. Increase the timeout could
help in a
heavy load cluster, especially for the network is not very good. However,
that is not
your case now.

I am not sure about the "never recovery". Do you mean the logs "Connection
refused"
keep going and do not have other logs? How long does it stay in "leader
election onging".
Usually, it takes at most 60s. Since if the old jobmanager crashed, then it
will lose
the leadership after zookeeper session timeout. So when the new jobmanager
always
could not grant the leadership, it may because of some problem of zookeeper.

Maybe you need to share the complete jobmanager logs so that we could know
what
is happening in the jobmanager.


Best,
Yang


Dinesh J  于2020年3月31日周二 上午3:46写道:

> HI Yang,
> Thanks for the clarification and suggestion. But my problem was that
> recovery never happens and the message "leader election ongoing" is what
> the message displayed forever.
> Do you think increasing akka.ask.timeout and akka.tcp.timeout will help in
> case of a heavy/highload cluster as this issue happens mainly during heavy
> load in cluster?
>
> Best,
> Dinesh
>
> On Mon, Mar 30, 2020 at 2:29 PM Yang Wang  wrote:
>
>> Hi Dinesh,
>>
>> First, i think the error message your provided is not a problem. It
>> just indicates that the leader
>> election is still ongoing. When it finished, the new leader will start
>> the a new dispatcher to provide
>> the webui and rest service.
>>
>> From your jobmanager logs "Connection refused: host1/ipaddress1:28681",
>> we could know that
>> the old jobmanager has failed. When a new jobmanager started, since the
>> old jobmanager still
>> hold the lock of leader latch. So Flink tries to connect with it. After
>> it tries few times, since the old
>> jobmanager zookeeper client do not update the leader latch, then the new
>> jobmanager will elect
>> successfully and be the active leader. It is just how the leader election
>> works.
>>
>> In a nutshell, the root cause is old jobmanager crashed and it does not
>> lose the leader immediately.
>> It is the by-design behavior.
>>
>> If you really want to make the recovery faster, i think you could
>> decrease "high-availability.zookeeper.client.connection-timeout"
>> and "high-availability.zookeeper.client.session-timeout". Please keep in
>> mind that too small value
>> will also cause unexpected failover because of network problem.
>>
>>
>> Best,
>> Yang
>>
>> Dinesh J  于2020年3月25日周三 下午4:20写道:
>>
>>> Hi Andrey,
>>> Yes . The job is not restarting sometimes after the current leader
>>> failure.
>>> Below is the message displayed when trying to reach the application
>>> master url via yarn ui and message remains the same even if the yarn job is
>>> running for 2 days.
>>> During this time , even current yarn application attempt is not getting
>>> failed and no containers are launched for jobmanager and taskmanager.
>>>
>>> *{"errors":["Service temporarily unavailable due to an ongoing leader
>>> election. Please refresh."]}*
>>>
>>> Thanks,
>>> Dinesh
>>>
>>> On Tue, Mar 24, 2020 at 6:45 PM Andrey Zagrebin 
>>> wrote:
>>>
 Hi Dinesh,

 If the current leader crashes (e.g. due to network failures) then
 getting these messages do not look like a problem during the leader
 re-election.
 They look to me just as warnings that caused failover.

 Do you observe any problem with your application? Does the failover not
 work, e.g. no leader is elected or a job is not restarted after the current
 leader failure?

 Best,
 Andrey

 On Sun, Mar 22, 2020 at 11:14 AM Dinesh J  wrote:

> Attaching the job manager log for reference.
>
> 2020-03-22 11:39:02,693 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever  -
> Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@host1:28681/user/dispatcher.
> 2020-03-22 11:39:02,724 WARN
>  akka.remote.transport.netty.NettyTransport- Remote
> connection to [null] failed with java.net.ConnectException: Connection
> refused: host1/ipaddress1:28681
> 2020-03-22 11:39:02,724 WARN  akka.remote.ReliableDeliverySupervisor
>  - Association with remote system
> [akka.tcp://flink@host1:28681] has failed, address is now gated for
> [50] ms. Reason: [Association failed with [akka.tcp://flink@host1:28681]]
> Caused by: [Connection refused: host1/ipaddress1:28681]
> 2020-03-22 11:39:02,791 WARN
>  akka.remote.transport.netty.NettyTransport- Remote
> connection to [null] failed with java.net.ConnectException: Connection
> refused: host1/ipaddress1:28681
> 2020-03-22 11:39:02,792 WARN  akka.remote.ReliableDeliverySupervisor
>  - Association with remote system
> [akka.tcp://flink@host1:28681] has failed, address is now gated for
> [50] ms. Reason: [Association faile

Re: [ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

2020-03-30 Thread Dian Fu
Hi Jeff,

Thanks for the great work and sharing it with the community! Very impressive 
and will try it out.

Regards,
Dian

> 在 2020年3月30日,下午9:16,Till Rohrmann  写道:
> 
> This is great news Jeff! Thanks a lot for sharing it with the community. 
> Looking forward trying Flink on Zeppelin out :-)
> 
> Cheers,
> Till
> 
> On Mon, Mar 30, 2020 at 2:47 PM Jeff Zhang  > wrote:
> Hi Folks,
> 
> I am very excited to announce the integration work of flink on apache 
> zeppelin notebook is completed. You can now run flink jobs via datastream 
> api, table api, sql, pyflink in apache apache zeppelin notebook. Download it 
> here http://zeppelin.apache.org/download.html 
> ), 
> 
> Here's some highlights of this work
> 
> 1. Support 3 kind of execution mode: local, remote, yarn
> 2. Support multiple languages  in one flink session: scala, python, sql
> 3. Support hive connector (reading from hive and writing to hive) 
> 4. Dependency management
> 5. UDF support (scala, pyflink)
> 6. Support both batch sql and streaming sql
> 
> For more details and usage instructions, you can refer following 4 blogs
> 
> 1) Get started https://link.medium.com/oppqD6dIg5 
> 
> 2) Batch https://link.medium.com/3qumbwRIg5 
> 3) Streaming  https://link.medium.com/RBHa2lTIg5 
> 
> 4) Advanced usage  https://link.medium.com/CAekyoXIg5 
> 
> 
> Welcome to use flink on zeppelin and give feedback and comments. 
> 
> -- 
> Best Regards
> 
> Jeff Zhang



Re: Issue with single job yarn flink cluster HA

2020-03-30 Thread Dinesh J
Hi Yang,
I am attaching one full jobmanager log for a job which I reran today. This
a job that tries to read from savepoint.
Same error message "leader election onging" is displayed. And this stays
the same even after 30 minutes. If I leave the job without yarn kill, it
stays the same forever.
Based on your suggestions till now, I guess it might be some zookeeper
problem. If that is the case, what can I lookout for in zookeeper to figure
out the issue?

Thanks,
Dinesh


On Tue, Mar 31, 2020 at 7:42 AM Yang Wang  wrote:

> I think your problem is not about akka timeout. Increase the timeout could
> help in a
> heavy load cluster, especially for the network is not very good. However,
> that is not
> your case now.
>
> I am not sure about the "never recovery". Do you mean the logs "Connection
> refused"
> keep going and do not have other logs? How long does it stay in "leader
> election onging".
> Usually, it takes at most 60s. Since if the old jobmanager crashed, then
> it will lose
> the leadership after zookeeper session timeout. So when the new jobmanager
> always
> could not grant the leadership, it may because of some problem of
> zookeeper.
>
> Maybe you need to share the complete jobmanager logs so that we could know
> what
> is happening in the jobmanager.
>
>
> Best,
> Yang
>
>
> Dinesh J  于2020年3月31日周二 上午3:46写道:
>
>> HI Yang,
>> Thanks for the clarification and suggestion. But my problem was that
>> recovery never happens and the message "leader election ongoing" is what
>> the message displayed forever.
>> Do you think increasing akka.ask.timeout and akka.tcp.timeout will help
>> in case of a heavy/highload cluster as this issue happens mainly during
>> heavy load in cluster?
>>
>> Best,
>> Dinesh
>>
>> On Mon, Mar 30, 2020 at 2:29 PM Yang Wang  wrote:
>>
>>> Hi Dinesh,
>>>
>>> First, i think the error message your provided is not a problem. It
>>> just indicates that the leader
>>> election is still ongoing. When it finished, the new leader will start
>>> the a new dispatcher to provide
>>> the webui and rest service.
>>>
>>> From your jobmanager logs "Connection refused: host1/ipaddress1:28681",
>>> we could know that
>>> the old jobmanager has failed. When a new jobmanager started, since the
>>> old jobmanager still
>>> hold the lock of leader latch. So Flink tries to connect with it. After
>>> it tries few times, since the old
>>> jobmanager zookeeper client do not update the leader latch, then the new
>>> jobmanager will elect
>>> successfully and be the active leader. It is just how the leader
>>> election works.
>>>
>>> In a nutshell, the root cause is old jobmanager crashed and it does not
>>> lose the leader immediately.
>>> It is the by-design behavior.
>>>
>>> If you really want to make the recovery faster, i think you could
>>> decrease "high-availability.zookeeper.client.connection-timeout"
>>> and "high-availability.zookeeper.client.session-timeout". Please keep in
>>> mind that too small value
>>> will also cause unexpected failover because of network problem.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Dinesh J  于2020年3月25日周三 下午4:20写道:
>>>
 Hi Andrey,
 Yes . The job is not restarting sometimes after the current leader
 failure.
 Below is the message displayed when trying to reach the application
 master url via yarn ui and message remains the same even if the yarn job is
 running for 2 days.
 During this time , even current yarn application attempt is not getting
 failed and no containers are launched for jobmanager and taskmanager.

 *{"errors":["Service temporarily unavailable due to an ongoing leader
 election. Please refresh."]}*

 Thanks,
 Dinesh

 On Tue, Mar 24, 2020 at 6:45 PM Andrey Zagrebin 
 wrote:

> Hi Dinesh,
>
> If the current leader crashes (e.g. due to network failures) then
> getting these messages do not look like a problem during the leader
> re-election.
> They look to me just as warnings that caused failover.
>
> Do you observe any problem with your application? Does the failover
> not work, e.g. no leader is elected or a job is not restarted after the
> current leader failure?
>
> Best,
> Andrey
>
> On Sun, Mar 22, 2020 at 11:14 AM Dinesh J 
> wrote:
>
>> Attaching the job manager log for reference.
>>
>> 2020-03-22 11:39:02,693 WARN
>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever  
>> -
>> Error while retrieving the leader gateway. Retrying to connect to
>> akka.tcp://flink@host1:28681/user/dispatcher.
>> 2020-03-22 11:39:02,724 WARN
>>  akka.remote.transport.netty.NettyTransport- Remote
>> connection to [null] failed with java.net.ConnectException: Connection
>> refused: host1/ipaddress1:28681
>> 2020-03-22 11:39:02,724 WARN  akka.remote.ReliableDeliverySupervisor
>>- Association with remote system
>> [a

Re: [ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

2020-03-30 Thread Zhijiang

Thanks for the continuous efforts for engaging in Flink ecosystem Jeff!
Glad to see the progressive achievement. Wish more users try it out in practice.

Best,
Zhijiang



--
From:Dian Fu 
Send Time:2020 Mar. 31 (Tue.) 10:15
To:Jeff Zhang 
Cc:user ; dev 
Subject:Re: [ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

Hi Jeff,

Thanks for the great work and sharing it with the community! Very impressive 
and will try it out.

Regards,
Dian

在 2020年3月30日,下午9:16,Till Rohrmann  写道:
This is great news Jeff! Thanks a lot for sharing it with the community. 
Looking forward trying Flink on Zeppelin out :-)

Cheers,
Till
On Mon, Mar 30, 2020 at 2:47 PM Jeff Zhang  wrote:
Hi Folks,

I am very excited to announce the integration work of flink on apache zeppelin 
notebook is completed. You can now run flink jobs via datastream api, table 
api, sql, pyflink in apache apache zeppelin notebook. Download it here 
http://zeppelin.apache.org/download.html), 

Here's some highlights of this work

1. Support 3 kind of execution mode: local, remote, yarn
2. Support multiple languages  in one flink session: scala, python, sql
3. Support hive connector (reading from hive and writing to hive) 
4. Dependency management
5. UDF support (scala, pyflink)
6. Support both batch sql and streaming sql

For more details and usage instructions, you can refer following 4 blogs

1) Get started https://link.medium.com/oppqD6dIg5 2) Batch 
https://link.medium.com/3qumbwRIg5 3) Streaming 
https://link.medium.com/RBHa2lTIg5 4) Advanced usage 
https://link.medium.com/CAekyoXIg5

Welcome to use flink on zeppelin and give feedback and comments. 

-- 
Best Regards

Jeff Zhang



Re: [ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

2020-03-30 Thread Jingsong Li
Thanks Jeff very much, that is very impressive.

Zeppelin is very convenient development platform.

Best,
Jingsong Lee

On Tue, Mar 31, 2020 at 11:58 AM Zhijiang 
wrote:

>
> Thanks for the continuous efforts for engaging in Flink ecosystem Jeff!
> Glad to see the progressive achievement. Wish more users try it out in
> practice.
>
> Best,
> Zhijiang
>
>
> --
> From:Dian Fu 
> Send Time:2020 Mar. 31 (Tue.) 10:15
> To:Jeff Zhang 
> Cc:user ; dev 
> Subject:Re: [ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)
>
> Hi Jeff,
>
> Thanks for the great work and sharing it with the community! Very
> impressive and will try it out.
>
> Regards,
> Dian
>
> 在 2020年3月30日,下午9:16,Till Rohrmann  写道:
>
> This is great news Jeff! Thanks a lot for sharing it with the community.
> Looking forward trying Flink on Zeppelin out :-)
>
> Cheers,
> Till
>
> On Mon, Mar 30, 2020 at 2:47 PM Jeff Zhang  wrote:
> Hi Folks,
>
> I am very excited to announce the integration work of flink on apache
> zeppelin notebook is completed. You can now run flink jobs via datastream
> api, table api, sql, pyflink in apache apache zeppelin notebook. Download
> it here http://zeppelin.apache.org/download.html),
>
> Here's some highlights of this work
>
> 1. Support 3 kind of execution mode: local, remote, yarn
> 2. Support multiple languages  in one flink session: scala, python, sql
> 3. Support hive connector (reading from hive and writing to hive)
> 4. Dependency management
> 5. UDF support (scala, pyflink)
> 6. Support both batch sql and streaming sql
>
> For more details and usage instructions, you can refer following 4 blogs
>
> 1) Get started https://link.medium.com/oppqD6dIg5
>  2) Batch https://
> link.medium.com/3qumbwRIg5  3) Streaming
> https://link.medium.com/RBHa2lTIg5  4)
> Advanced usage https://link.medium.com/CAekyoXIg5
> 
>
> Welcome to use flink on zeppelin and give feedback and comments.
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>

-- 
Best, Jingsong Lee


Re: Log file environment variable 'log.file' is not set.

2020-03-30 Thread Robert Metzger
Hey Vitaliy,

Check this documentation on how to use Flink with Hadoop:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html
For your setup, I would recommend referencing the Hadoop jars from your
Hadoop vendor by setting

export HADOOP_CLASSPATH=`hadoop classpath`

Is it possible that the files on your cluster are Flink 1.7.0 files, while
your Flink job maven project has Flink 1.10 dependencies?
On your server, what version do the flink jar files in lib/ have?

If your are launching Flink like this...

./bin/flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m ./my/project.jar

... it will use the files in lib/ for starting Flink.

Best,
Robert


On Mon, Mar 30, 2020 at 5:39 PM Vitaliy Semochkin 
wrote:

> Hello Robert,
> Thank you for quick response!
> Indeed logs says the hadoop version is 2.4.1 this is probably because of
>
> https://github.com/apache/flink/blob/b17a597dec80e590db2beedda446aa3cae9920dd/pom.xml#L96
> How can I make 1.10 to work with my current hadoop version?
>
> Regarding flink reporting in logs its 1.7.0
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Starting
> YarnJobClusterEntrypoint (Version: 1.7.0
> while I'm using 1.10 and this is application cluster (everything is
> bundled and we don't have session cluster running).
> Here are the whole dependencies list:
> mvn dependency:tree | grep flink | cut -d'-' -f2-
>  org.apache.flink:flink-yarn_2.11:jar:1.10.0:runtime
>  org.apache.flink:flink-clients_2.11:jar:1.10.0:compile
>  org.apache.flink:flink-optimizer_2.11:jar:1.10.0:compile
>  org.apache.flink:flink-shaded-hadoop-2:jar:2.4.1-9.0:runtime
>  org.apache.flink:force-shading:jar:1.10.0:compile
>  org.apache.flink:flink-runtime_2.11:jar:1.10.0:runtime
>  org.apache.flink:flink-core:jar:1.10.0:compile
>  org.apache.flink:flink-annotations:jar:1.10.0:compile
>  org.apache.flink:flink-metrics-core:jar:1.10.0:compile
>  org.apache.flink:flink-java:jar:1.10.0:compile
>  org.apache.flink:flink-queryable-state-client-java:jar:1.10.0:runtime
>  org.apache.flink:flink-hadoop-fs:jar:1.10.0:runtime
>  org.apache.flink:flink-shaded-netty:jar:4.1.39.Final-9.0:compile
>  org.apache.flink:flink-shaded-guava:jar:18.0-9.0:compile
>  org.apache.flink:flink-shaded-asm-7:jar:7.1-9.0:compile
>  org.apache.flink:flink-shaded-jackson:jar:2.10.1-9.0:compile
>  org.apache.flink:flink-jdbc_2.11:jar:1.10.0:compile
>  org.apache.flink:flink-hbase_2.11:jar:1.10.0:compile
>  org.apache.flink:flink-runtime-web_2.11:jar:1.10.0:compile
> As you can see all flink related libs are 1.10.
>
> Can you please tell which class in flinks identifies the version(I'll try
> to debug it locally)?
>
> Regards,
> Vitaliy
>
>
> On Mon, Mar 30, 2020 at 5:10 PM Robert Metzger 
> wrote:
>
>> Hey Vitaliy,
>> is it okay for you if we keep the discussion on the list, so that others
>> can chime in to help, and that Google can index the conversation, in case
>> somebody else has a similar problem?
>>
>> I just checked, and Flink on YARN in Flink 1.10 does set the
>> property correctly. Maybe in Flink 1.7, accessing the logs in the web ui
>> was not yet supported.
>>
>> You said in your email, that you are using Flink 1.10, however, your logs
>> state that you are running Flink 1.7.0.
>> It also seems that you have the Hadoop 2.4.1 dependencies of Flink, but
>> your Hadoop environment is Hadoop 2.7.3. I believe this error is caused by
>> that version mismatch:
>>
>> Caused by: java.lang.IllegalAccessError: tried to access method
>>> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
>>> from class
>>> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
>>
>>
>>
>>
>> On Mon, Mar 30, 2020 at 1:58 PM Vitaliy Semochkin 
>> wrote:
>>
>>> Hello Robert,
>>> >Where exactly are you seeing the "Log file environment variable
>>> 'log.file' is not set." message?
>>> I get this message when I check yarn logs.
>>>
>>> >Can you post some context around it? (is this shown from the command
>>> line? what are the arguments? is it shown in a file?
>>> I'm creating an application cluster from a java application using flink
>>> 1.10. I didn't have this issue with 1.8.1 version,
>>> however when I upgraded configuration slightly changed, e.g.
>>> ClusterSpecification taskManagerMemoryMB is now ignored, and should be set
>>> via fink Configuration.
>>> (though it's value still validated, but no longer used after that).
>>> My main issue, is that Application seems to fail to start properly,  it
>>> seems that JobMaster fails to connect to ResourceManager, but I can't
>>> figure out why.
>>> The yarn log  is attached.
>>>
>>> I'll appreciate if you tell me to which direction I should dig.
>>>
>>> Regards,
>>> Vitaliy
>>>
>>>
>>> On Mon, Mar 30, 2020 at 12:00 PM Robert Metzger 
>>> wrote:
>>>
 Hey,
 which Flink version are you using?

 Where exactly are you seeing the "Log file environment variable
 'log.file' is not set."