Re: How to install Flink + YARN?

2019-11-19 Thread Ana
Hi,

I was able to run Flink on YARN by installing YARN and Flink separately.

Thank you.

Ana

On Wed, Nov 20, 2019 at 10:42 AM Pankaj Chand 
wrote:

> Hello,
>
> I want to run Flink on YARN upon a cluster of nodes. From the
> documentation, I was not able to fully understand how to go about it. Some
> of the archived answers are a bit old and had pending JIRA issues, so I
> thought I would ask.
>
> Am I first supposed to install YARN separately, and then download the
> Flink file and Hadoop pre-bundle? Or does the Hadoop-prebundle that I put
> into Flink's /lib folder provide the entire YARN installation?
>
> Is there any download that bundles a complete *installation of Fink +
> installation of YARN*?
>
> Thank you,
>
> Pankaj
>


Re: Retrieving Flink job ID/YARN Id programmatically

2019-11-24 Thread Ana
Hi Lei,

To add, you may use Hadoop Resource Manager REST APIs
https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html.
I'm also running Flink application on YARN and use this API for that
purpose. If you find other way or a much better solution, please let me
know!

Regards,
Ana

On Fri, Nov 22, 2019 at 10:58 AM vino yang  wrote:

> Hi Lei,
>
> It would be better to use Flink's RESTful API to fetch the information of
> the running jobs[1].
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-1
>
> Best,
> Vino
>
> Lei Nie  于2019年11月22日周五 上午4:14写道:
>
>> I looked at the code, and
>> StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID is
>> generating a random ID unrelated to the actual ID used.
>>
>> Is there any way to fetch the real ID at runtime?
>> Use case: fetch most recent checkpoint from stable storage for
>> automated restarts. Most recent checkpoint has form
>> ".../checkpoints/flink_app_id/chk-123"
>>
>> On Thu, Nov 21, 2019 at 11:28 AM Lei Nie  wrote:
>> >
>> > This does not get the correct id:
>> > StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID =
>> > eea5abc21dd8743a4090f4a3a660f9e8
>> > Actual job ID (from webUI): 1357d21be640b6a3b8a86a063f4bba8a
>> >
>> >
>> >
>> > On Thu, Nov 7, 2019 at 6:56 PM vino yang  wrote:
>> > >
>> > > Hi Lei Nie,
>> > >
>> > > You can use
>> `StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID` to get the
>> job id.
>> > >
>> > > Best,
>> > > Vino
>> > >
>> > > Lei Nie  于2019年11月8日周五 上午8:38写道:
>> > >>
>> > >> Hello,
>> > >> I am currently executing streaming jobs via
>> StreamExecutionEnvironment. Is it possible to retrieve the Flink job
>> ID/YARN ID within the context of a job? I'd like to be able to
>> automatically register the job such that monitoring jobs can run (REST api
>> requires for example job id).
>> > >>
>> > >> Thanks
>>
>


Flink 1.12 Release Preview Meetup

2020-12-04 Thread Ana Vasiliuk
Hi everyone!

We'll be hosting a meetup on the upcoming Apache Flink 1.12 release on
December 9.
Join the Ask-Me-Anything (AMA) session with Aljoscha Krettek, Timo Walther,
Stephan Ewen, Arvid Heise and Robert Metzger.

The AMA will be live-streamed on Youtube (link TBA in the meetup groups
below). After that, there will be a short hangout on Spatial Chat for those
who want to continue the chat.

*7 PM - 8:30 PM CET:*
https://www.meetup.com/Apache-Flink-Meetup/events/274983531/
*10 AM PT - 11:30 AM PT/ 1 PM - 2:30 PM ET*:
https://www.meetup.com/Bay-Area-Apache-Flink-Meetup/events/274984913/
<https://www.meetup.com/Bay-Area-Apache-Flink-Meetup/events/274984913/>

Please add and upvote questions before the event at
https://app.sli.do/event/jz8sny4d.
Event code: #43177

Hope to see you there!

Cheers,
Ana
-- 

*Ana Vasiliuk *| Community Marketing Manager


<https://www.ververica.com/>


Follow us @VervericaData <https://twitter.com/ververicadata>

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng


Flink 1.12 Ask-Me-Anything Meetup

2020-12-09 Thread Ana Vasiliuk
Hi everyone,

The Flink 1.12 AMA is happening today at 10 am Pacific Time/ 1 pm Eastern
Time/ 7 pm Central European Time. Tune in directly at
https://youtu.be/u8jPgXoNDXA for a discussion on the upcoming release and
new features with Aljoscha Krettek, Stephan Ewen, Arvid Heise, Robert
Metzger, and Timo Walther.

Post your questions at https://app.sli.do/event/jz8sny4d (code #43177).

After the AMA, join us for some social time here:
https://spatial.chat/s/release-meetup.

Hope to see you there!

Ana

-- 

*Ana Vasiliuk *| Community Marketing Manager


<https://www.ververica.com/>


Follow us @VervericaData <https://twitter.com/ververicadata>

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng


[HEADS UP] Flink Community Survey

2021-03-16 Thread Ana Vasiliuk
Hi everyone!

We're constantly working to improve the Flink community experience and need
your help! Please take 2 min to share your thoughts with us via a short
survey [1].

Your feedback will help us understand where we stand on communication
between community members, what activities you prefer or would like to see
more of, and in general, identify opportunities for improvement. The survey
only takes 2 minutes (possibly less) and is anonymous.

Thanks a lot!

Ana

[1] https://form.typeform.com/to/M5JyHILk


[HEADS UP] Flink Community Survey closes Tue, March 30

2021-03-25 Thread Ana Vasiliuk
Hi all,

Thanks to everyone who has already left feedback on the community
experience in the Community Survey!

The survey is open until *Tuesday, March 30th*, so if you haven't done so
yet, please take 2 minutes (maybe less!) to fill it out below. Your opinion
is very helpful for us to better understand your satisfaction with the
current community activities and learn where we can improve.

Thanks a lot!

Ana
[1] https://form.typeform.com/to/M5JyHILk


New dates for Flink Forward Global Virtual 2020

2020-05-28 Thread Ana Vasiliuk
Hi everyone,

Flink Forward Global Virtual 2020 is now a 4-day conference, featuring two
training days on October 19 & 20! The organizers have decided to extend the
training program for this event to ensure that you get the most out of your
time with our team of Flink experts.

*New dates:*
Apache Flink Training - October 19 - 20
Flink Forward keynotes and breakout sessions - October 21 - 22

The conference days will be free to attend and there will be a limited
number of paid training tickets available soon. Please reserve your spot at
http://flink-forward.org/global-2020.

More information to follow, including pricing and further details of the
training agenda. If you have any questions, please feel free to reach out
to the organizing team via *he...@flink-forward.org
*.

The *Call for Presentations* is also open, so if you want to share your
real-world world use cases and best practices with an international
audience of Flink enthusiasts, don’t forget to submit your talk by *June 19*,
for a chance to be included in the program!

Submit your talk at
https://www.flink-forward.org/global-2020/call-for-presentations.

Hope to see you virtually in October!
Ana

-- 

*Ana Vasiliuk *| Community Marketing Manager


<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng


Problem to show logs in task managers

2015-12-17 Thread Ana M. Martinez
Hi flink community,

I am trying to show log messages using log4j.
It works fine overall except for the messages I want to show in an inner class 
that implements org.apache.flink.api.common.aggregators.ConvergenceCriterion.
I am very new to this, but it seems that I’m having problems to show the 
messages included in the isConverged function, as it runs in the task managers? 
E.g. the log messages in the outer class (before map-reduce operations) are 
properly shown.

I am also interested in providing my own log4j.properties file. I am using the 
./bin/flink run -m yarn-cluster on Amazon clusters. 

Thanks,
Ana

Re: Problem to show logs in task managers

2015-12-18 Thread Ana M. Martinez
Hi Till,

Many thanks for your quick response.

I have modified the WordCountExample to re-reproduce my problem in a simple 
example.

I run the code below with the following command:
./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c 
mypackage.WordCountExample ../flinklink.jar

And if I check the log file I see all logger messages except the one in the 
flatMap function of the inner LineSplitter class, which is actually the one I 
am most interested in.

Is that an expected behaviour?

Thanks,
Ana


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class WordCountExample {
static Logger logger = LoggerFactory.getLogger(WordCountExample.class);

public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

logger.info("Entering application.");

DataSet text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");

List elements = new ArrayList();
elements.add(0);


DataSet set = env.fromElements(new TestClass(elements));

DataSet> wordCounts = text
.flatMap(new LineSplitter())
.withBroadcastSet(set, "set")
.groupBy(0)
.sum(1);

wordCounts.print();


}

public static class LineSplitter implements FlatMapFunction> {

static Logger loggerLineSplitter = 
LoggerFactory.getLogger(LineSplitter.class);

@Override
public void flatMap(String line, Collector> 
out) {
loggerLineSplitter.info("Logger in LineSplitter.flatMap");
for (String word : line.split(" ")) {
out.collect(new Tuple2(word, 1));
}
}
}

public static class TestClass implements Serializable {
private static final long serialVersionUID = -2932037991574118651L;

static Logger loggerTestClass = 
LoggerFactory.getLogger("WordCountExample.TestClass");

List integerList;
public TestClass(List integerList){
this.integerList=integerList;
loggerTestClass.info("Logger in TestClass");
    }


}
}



On 17 Dec 2015, at 16:08, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:

Hi Ana,

you can simply modify the `log4j.properties` file in the `conf` directory. It 
should be automatically included in the Yarn application.

Concerning your logging problem, it might be that you have set the logging 
level too high. Could you share the code with us?

Cheers,
Till

On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez 
mailto:a...@cs.aau.dk>> wrote:
Hi flink community,

I am trying to show log messages using log4j.
It works fine overall except for the messages I want to show in an inner class 
that implements org.apache.flink.api.common.aggregators.ConvergenceCriterion.
I am very new to this, but it seems that I’m having problems to show the 
messages included in the isConverged function, as it runs in the task managers?
E.g. the log messages in the outer class (before map-reduce operations) are 
properly shown.

I am also interested in providing my own log4j.properties file. I am using the 
./bin/flink run -m yarn-cluster on Amazon clusters.

Thanks,
Ana




Re: Problem to show logs in task managers

2016-01-04 Thread Ana M. Martinez
Hi Till,

Sorry for the delay (Xmas break). I have activated log aggregation on 
flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find a 
yarn-site.xml).
But the command yarn logs -applicationId application_1451903796996_0008 gives 
me the following output:

INFO client.RMProxy: Connecting to ResourceManager at xxx
/var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does not 
exist.
Log aggregation has not completed or is not enabled

I’ve tried to restart the Flink JobManager and TaskManagers as follows:
./bin/yarn-session.sh -n 1 -tm 2048 -s 4
and then with a detached screen, run my application with ./bin/flink run -m 
yarn-cluster ...

I am not sure if my problem is that I am not setting the log-aggregation-enable 
property well or I am not restarting the Flink JobManager and TaskManagers as I 
should… Any idea?

Thanks,
Ana

On 18 Dec 2015, at 16:29, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:

In which log file are you exactly looking for the logging statements? And on 
what machine? You have to look on the machines on which the yarn container were 
started. Alternatively if you have log aggregation activated, then you can 
simply retrieve the log files via yarn logs.

Cheers,
Till

On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez 
mailto:a...@cs.aau.dk>> wrote:
Hi Till,

Many thanks for your quick response.

I have modified the WordCountExample to re-reproduce my problem in a simple 
example.

I run the code below with the following command:
./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c 
mypackage.WordCountExample ../flinklink.jar

And if I check the log file I see all logger messages except the one in the 
flatMap function of the inner LineSplitter class, which is actually the one I 
am most interested in.

Is that an expected behaviour?

Thanks,
Ana


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class WordCountExample {
static Logger logger = LoggerFactory.getLogger(WordCountExample.class);

public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

logger.info("Entering application.");

DataSet text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");

List elements = new ArrayList();
elements.add(0);


DataSet set = env.fromElements(new TestClass(elements));

DataSet> wordCounts = text
.flatMap(new LineSplitter())
.withBroadcastSet(set, "set")
.groupBy(0)
.sum(1);

wordCounts.print();


}

public static class LineSplitter implements FlatMapFunction> {

static Logger loggerLineSplitter = 
LoggerFactory.getLogger(LineSplitter.class);

@Override
public void flatMap(String line, Collector> 
out) {
loggerLineSplitter.info("Logger in LineSplitter.flatMap");
for (String word : line.split(" ")) {
out.collect(new Tuple2(word, 1));
}
}
}

public static class TestClass implements Serializable {
private static final long serialVersionUID = -2932037991574118651L;

static Logger loggerTestClass = 
LoggerFactory.getLogger("WordCountExample.TestClass");

List integerList;
public TestClass(List integerList){
this.integerList=integerList;
loggerTestClass.info("Logger in TestClass");
}


}
}



On 17 Dec 2015, at 16:08, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:

Hi Ana,

you can simply modify the `log4j.properties` file in the `conf` directory. It 
should be automatically included in the Yarn application.

Concerning your logging problem, it might be that you have set the logging 
level too high. Could you share the code with us?

Cheers,
Till

On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez 
mailto:a...@cs.aau.dk>> wrote:
Hi flink community,

I am trying to show log messages using log4j.
It works fine overall except for the messages I want to show in an inner class 
that implements org.apache.flink.api.common.aggregators.ConvergenceCriterion.
I am very new to this, but it seems that I’m having problems to show the 
messages included in the isConverged function, as it runs in the task managers?
E.g. the log messages in the outer class (before map-reduce operations) are 
properly shown.

I am also interested in providing my own log4j.properties file. I am using the 
./bin/flink run -m yarn-cluster on Amazon clusters.

Thanks,
Ana






Re: Problem to show logs in task managers

2016-01-06 Thread Ana M. Martinez
Hi Till,

I am afraid it does not work in any case.

I am following the steps you indicate on your websites (for yarn configuration 
and loggers with slf4j):

1) Enable log aggregation in yarn-site:
https://ci.apache.org/projects/flink/flink-docs-release-0.7/yarn_setup.html#log-files

2) Include Loggers as indicated here (see WordCountExample below):
https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_logging.html

But I cannot get the log messages that run in the map functions. Am I missing 
something?

Thanks,
Ana

On 04 Jan 2016, at 14:00, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:


I think the YARN application has to be finished in order for the logs to be 
accessible.

Judging from you commands, you’re starting a long running YARN application 
running Flink with ./bin/yarn-session.sh -n 1 -tm 2048 -s 4. This cluster won’t 
be used though, because you’re executing your job with ./bin/flink run -m 
yarn-cluster which will start another YARN application which is only alive as 
long as the Flink job is executed. If you want to run your job on the long 
running YARN application, then you simply have to omit -m yarn-cluster.

Cheers,
Till

​

On Mon, Jan 4, 2016 at 12:36 PM, Ana M. Martinez 
mailto:a...@cs.aau.dk>> wrote:
Hi Till,

Sorry for the delay (Xmas break). I have activated log aggregation on 
flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find a 
yarn-site.xml).
But the command yarn logs -applicationId application_1451903796996_0008 gives 
me the following output:

INFO client.RMProxy: Connecting to ResourceManager at xxx
/var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does not 
exist.
Log aggregation has not completed or is not enabled

I’ve tried to restart the Flink JobManager and TaskManagers as follows:
./bin/yarn-session.sh -n 1 -tm 2048 -s 4
and then with a detached screen, run my application with ./bin/flink run -m 
yarn-cluster ...

I am not sure if my problem is that I am not setting the log-aggregation-enable 
property well or I am not restarting the Flink JobManager and TaskManagers as I 
should… Any idea?

Thanks,
Ana

On 18 Dec 2015, at 16:29, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:

In which log file are you exactly looking for the logging statements? And on 
what machine? You have to look on the machines on which the yarn container were 
started. Alternatively if you have log aggregation activated, then you can 
simply retrieve the log files via yarn logs.

Cheers,
Till

On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez 
mailto:a...@cs.aau.dk>> wrote:
Hi Till,

Many thanks for your quick response.

I have modified the WordCountExample to re-reproduce my problem in a simple 
example.

I run the code below with the following command:
./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c 
mypackage.WordCountExample ../flinklink.jar

And if I check the log file I see all logger messages except the one in the 
flatMap function of the inner LineSplitter class, which is actually the one I 
am most interested in.

Is that an expected behaviour?

Thanks,
Ana


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class WordCountExample {
static Logger logger = LoggerFactory.getLogger(WordCountExample.class);

public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

logger.info("Entering application.");

DataSet text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");

List elements = new ArrayList();
elements.add(0);


DataSet set = env.fromElements(new TestClass(elements));

DataSet> wordCounts = text
.flatMap(new LineSplitter())
.withBroadcastSet(set, "set")
.groupBy(0)
.sum(1);

wordCounts.print();


}

public static class LineSplitter implements FlatMapFunction> {

static Logger loggerLineSplitter = 
LoggerFactory.getLogger(LineSplitter.class);

@Override
public void flatMap(String line, Collector> 
out) {
loggerLineSplitter.info("Logger in LineSplitter.flatMap");
for (String word : line.split(" ")) {
out.collect(new Tuple2(word, 1));
}
}
}

public static class TestClass implements Serializable {
private static final long serialVersionUID = -2932037991574118651L;

static Log

Re: Problem to show logs in task managers

2016-01-08 Thread Ana M. Martinez
 - Logger in TestClass
13:31:16,346 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class eu.amidst.flinklink.examples.WordCountExample$TestClass is not a valid 
POJO type
13:31:16,376 INFO  org.apache.flink.client.CliFrontend  
 - Program execution finished
13:31:16,384 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
 - Shutting down remote daemon.
13:31:16,386 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
 - Remote daemon shut down; proceeding with flushing remote transports.
13:31:16,408 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
 - Remoting shut down.
13:31:16,431 INFO  org.apache.flink.client.CliFrontend  
 - Shutting down YARN cluster
13:31:16,431 INFO  org.apache.flink.yarn.FlinkYarnCluster   
 - Sending shutdown request to the Application Master
13:31:16,432 INFO  org.apache.flink.yarn.ApplicationClient  
 - Sending StopYarnSession request to ApplicationMaster.
13:31:16,568 INFO  org.apache.flink.yarn.ApplicationClient  
 - Remote JobManager has been stopped successfully. Stopping local application 
client
13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient  
 - Stopped Application client.
13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient  
 - Disconnect from JobManager 
Actor[akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782].
13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
 - Shutting down remote daemon.
13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
 - Remote daemon shut down; proceeding with flushing remote transports.
13:31:16,584 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
 - Remoting shut down.
13:31:16,595 INFO  org.apache.flink.yarn.FlinkYarnCluster   
 - Deleting files in 
hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005
13:31:16,596 INFO  org.apache.flink.yarn.FlinkYarnCluster   
 - Application application_1452250761414_0005 finished with state FINISHED and 
final state SUCCEEDED at 1452259876445
13:31:16,747 INFO  org.apache.flink.yarn.FlinkYarnCluster   
 - YARN Client is shutting down


You can see the log messages from the WordCountExample and TestClass classes. 
But I have problems to show the logger message (INFO) in the LineSplitter 
class. Presumably, because it is executed in the CORE nodes and node in the 
MASTER node (it all runs well in my local computer).

Any tips?
Ana


On 06 Jan 2016, at 15:58, Ana M. Martinez 
mailto:a...@cs.aau.dk>> wrote:

Hi Till,

I am afraid it does not work in any case.

I am following the steps you indicate on your websites (for yarn configuration 
and loggers with slf4j):

1) Enable log aggregation in yarn-site:
https://ci.apache.org/projects/flink/flink-docs-release-0.7/yarn_setup.html#log-files

2) Include Loggers as indicated here (see WordCountExample below):
https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_logging.html

But I cannot get the log messages that run in the map functions. Am I missing 
something?

Thanks,
Ana

On 04 Jan 2016, at 14:00, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:


I think the YARN application has to be finished in order for the logs to be 
accessible.

Judging from you commands, you’re starting a long running YARN application 
running Flink with ./bin/yarn-session.sh -n 1 -tm 2048 -s 4. This cluster won’t 
be used though, because you’re executing your job with ./bin/flink run -m 
yarn-cluster which will start another YARN application which is only alive as 
long as the Flink job is executed. If you want to run your job on the long 
running YARN application, then you simply have to omit -m yarn-cluster.

Cheers,
Till

​

On Mon, Jan 4, 2016 at 12:36 PM, Ana M. Martinez 
mailto:a...@cs.aau.dk>> wrote:
Hi Till,

Sorry for the delay (Xmas break). I have activated log aggregation on 
flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find a 
yarn-site.xml).
But the command yarn logs -applicationId application_1451903796996_0008 gives 
me the following output:

INFO client.RMProxy: Connecting to ResourceManager at xxx
/var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does not 
exist.
Log aggregation has not completed or is not enabled

I’ve tried to restart the Flink JobManager and TaskManagers as follows:
./bin/yarn-session.sh -n 1 -tm 2048 -s 4
and then with a detached screen, run my application with ./bin/flink run -m 
yarn-cluster ...

I am not sure if my problem is that I am not setting the log-aggregation-enable 
property well or I am not restarting the Flink JobManager and TaskManagers as I 
should… Any idea?

Thanks,
Ana

O

Re: Problem to show logs in task managers

2016-01-11 Thread Ana M. Martinez
Hi Till,

Thanks for that! I can see the "Logger in LineSplitter.flatMap” output if I 
retrieve the task manager logs manually (under 
/var/log/hadoop-yarn/containers/application_X/…). However that solution is not 
ideal when for instance I am using 32 machines for my mapReduce operations.

I would like to know why Yarn’s log aggregation is not working. Can you tell me 
how to check if there are some Yarn containers running after the Flink job has 
finished? I have tried:
hadoop job -list
but I cannot see any jobs there, although I am not sure that it means that 
there are not containers running...

Thanks,
Ana

On 08 Jan 2016, at 16:24, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:


You’re right that the log statements of the LineSplitter are in the logs of the 
cluster nodes, because that’s where the LineSplitter code is executed. In 
contrast, you create a TestClass on the client when you submit the program. 
Therefore, you see the logging statement “Logger in TestClass” on the command 
line or in the cli log file.

So I would assume that the problem is Yarn’s log aggregation. Either your 
configuration is not correct or there are still some Yarn containers running 
after the Flink job has finished. Yarn will only show you the logs after all 
containers are terminated. Maybe you could check that. Alternatively, you can 
try to retrieve the taskmanager logs manually by going to the machine where 
your yarn container was executed. Then under hadoop/logs/userlogs you should 
find somewhere the logs.

Cheers,
Till

​

On Fri, Jan 8, 2016 at 3:50 PM, Ana M. Martinez 
mailto:a...@cs.aau.dk>> wrote:
Thanks for the tip Robert! It was a good idea to rule out other possible 
causes, but I am afraid that is not the problem. If we stick to the 
WordCountExample (for simplicity), the Exception is thrown if placed into the 
flatMap function.

I am going to try to re-write my problem and all the settings below:

When I try to aggregate all logs:
 $yarn logs -applicationId application_1452250761414_0005

the following message is retrieved:
16/01/08 13:32:37 INFO client.RMProxy: Connecting to ResourceManager at 
ip-172-31-33-221.us<http://ip-172-31-33-221.us/>-west-2.compute.internal/172.31.33.221:8032<http://172.31.33.221:8032/>
/var/log/hadoop-yarn/apps/hadoop/logs/application_1452250761414_0005does not 
exist.
Log aggregation has not completed or is not enabled.

(Tried the same command a few minutes later and got the same message, so might 
it be that log aggregation is not properly enabled??)

I am going to carefully enumerate all the steps I have followed (and settings) 
to see if someone can identify why the Logger messages from CORE nodes (in an 
Amazon cluster) are not shown.

1) Enable yarn.log-aggregation-enable property to true in 
/etc/alternatives/hadoop-conf/yarn-site.xml.

2) Include log messages in my WordCountExample as follows:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;


public class WordCountExample {
static Logger logger = LoggerFactory.getLogger(WordCountExample.class);

public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

logger.info("Entering application.");

DataSet text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");

List elements = new ArrayList();
elements.add(0);


DataSet set = env.fromElements(new TestClass(elements));

DataSet> wordCounts = text
.flatMap(new LineSplitter())
.withBroadcastSet(set, "set")
.groupBy(0)
.sum(1);

wordCounts.writeAsText(“output.txt", FileSystem.WriteMode.OVERWRITE);


}

public static class LineSplitter implements FlatMapFunction> {

static Logger loggerLineSplitter = 
LoggerFactory.getLogger(LineSplitter.class);

@Override
public void flatMap(String line, Collector> 
out) {
loggerLineSplitter.info("Logger in LineSplitter.flatMap");

for (String word : line.split(" ")) {
out.collect(new Tuple2(word, 1));
//throw new RuntimeException("LineSplitter class 
called");
}

}
}

public static class TestClass implements Serializable {
private static final long serialVersionUID = -2932037991574118651L;

static Log

Re: Problem to show logs in task managers

2016-01-11 Thread Ana M. Martinez
Hi Till,

Thanks for your help. I have checked both in Yarn’s web interface and through 
command line and it seems that there are not occupied containers.

Additionally, I have checked the configuration values in the web interface and 
even though I have changed the log.aggregation property in the yarn-site.xml 
file to true, it appears as false and with the following source label:

yarn.log-aggregation-enable
false
java.io.BufferedInputStream@3c407114


I am not sure if that is relevant. I had assumed that the "./bin/flink run -m 
yarn-cluster" command is starting a yarn session and thus reloading the 
yarn-site file. Is that right? If I am wrong here, then, how can I restart it 
so that the modifications in the yarn-site.xml file are considered? (I have 
also tried with ./bin/yarn-session.sh and then ./bin/flink run without 
success…).

I am not sure if this is related to flink anymore, should I move my problem to 
the yarn community instead?

Thanks,
Ana

On 11 Jan 2016, at 10:37, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:


Hi Ana,

good to hear that you found the logging statements. You can check in Yarn’s web 
interface whether there are still occupied containers. Alternatively you can go 
to the different machines and run jps which lists you the running Java 
processes. If you see an ApplicationMaster or YarnTaskManagerRunner process, 
then there is still a container running with Flink on this machine. I hope this 
helps you.

Cheers,
Till

​

On Mon, Jan 11, 2016 at 9:37 AM, Ana M. Martinez 
mailto:a...@cs.aau.dk>> wrote:
Hi Till,

Thanks for that! I can see the "Logger in LineSplitter.flatMap” output if I 
retrieve the task manager logs manually (under 
/var/log/hadoop-yarn/containers/application_X/…). However that solution is not 
ideal when for instance I am using 32 machines for my mapReduce operations.

I would like to know why Yarn’s log aggregation is not working. Can you tell me 
how to check if there are some Yarn containers running after the Flink job has 
finished? I have tried:
hadoop job -list
but I cannot see any jobs there, although I am not sure that it means that 
there are not containers running...

Thanks,
Ana

On 08 Jan 2016, at 16:24, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:


You’re right that the log statements of the LineSplitter are in the logs of the 
cluster nodes, because that’s where the LineSplitter code is executed. In 
contrast, you create a TestClass on the client when you submit the program. 
Therefore, you see the logging statement “Logger in TestClass” on the command 
line or in the cli log file.

So I would assume that the problem is Yarn’s log aggregation. Either your 
configuration is not correct or there are still some Yarn containers running 
after the Flink job has finished. Yarn will only show you the logs after all 
containers are terminated. Maybe you could check that. Alternatively, you can 
try to retrieve the taskmanager logs manually by going to the machine where 
your yarn container was executed. Then under hadoop/logs/userlogs you should 
find somewhere the logs.

Cheers,
Till

​

On Fri, Jan 8, 2016 at 3:50 PM, Ana M. Martinez 
mailto:a...@cs.aau.dk>> wrote:
Thanks for the tip Robert! It was a good idea to rule out other possible 
causes, but I am afraid that is not the problem. If we stick to the 
WordCountExample (for simplicity), the Exception is thrown if placed into the 
flatMap function.

I am going to try to re-write my problem and all the settings below:

When I try to aggregate all logs:
 $yarn logs -applicationId application_1452250761414_0005

the following message is retrieved:
16/01/08 13:32:37 INFO client.RMProxy: Connecting to ResourceManager at 
ip-172-31-33-221.us<http://ip-172-31-33-221.us/>-west-2.compute.internal/172.31.33.221:8032<http://172.31.33.221:8032/>
/var/log/hadoop-yarn/apps/hadoop/logs/application_1452250761414_0005does not 
exist.
Log aggregation has not completed or is not enabled.

(Tried the same command a few minutes later and got the same message, so might 
it be that log aggregation is not properly enabled??)

I am going to carefully enumerate all the steps I have followed (and settings) 
to see if someone can identify why the Logger messages from CORE nodes (in an 
Amazon cluster) are not shown.

1) Enable yarn.log-aggregation-enable property to true in 
/etc/alternatives/hadoop-conf/yarn-site.xml.

2) Include log messages in my WordCountExample as follows:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;


public class WordCountExample {
static Logger logg

Re: Problem to show logs in task managers

2016-01-19 Thread Ana M. Martinez
Hi Till,

Sorry for the delay, you were right, I was not restarting the yarn cluster…

Many thanks for your help!
Ana

On 11 Jan 2016, at 14:39, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:


You have to restart the yarn cluster to let your changes take effect. You can 
do that via HADOOP_HOME/sbin/stop-yarn.sh; HADOOP_HOME/sbin/start-yarn.sh.

The commands yarn-session.sh ... and bin/flink run -m yarn cluster start a new 
yarn application within the yarn cluster.

Cheers,
Till

​

On Mon, Jan 11, 2016 at 1:39 PM, Ana M. Martinez 
mailto:a...@cs.aau.dk>> wrote:
Hi Till,

Thanks for your help. I have checked both in Yarn’s web interface and through 
command line and it seems that there are not occupied containers.

Additionally, I have checked the configuration values in the web interface and 
even though I have changed the log.aggregation property in the yarn-site.xml 
file to true, it appears as false and with the following source label:

yarn.log-aggregation-enable
false
java.io.BufferedInputStream@3c407114


I am not sure if that is relevant. I had assumed that the "./bin/flink run -m 
yarn-cluster" command is starting a yarn session and thus reloading the 
yarn-site file. Is that right? If I am wrong here, then, how can I restart it 
so that the modifications in the yarn-site.xml file are considered? (I have 
also tried with ./bin/yarn-session.sh and then ./bin/flink run without 
success…).

I am not sure if this is related to flink anymore, should I move my problem to 
the yarn community instead?

Thanks,
Ana

On 11 Jan 2016, at 10:37, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:


Hi Ana,

good to hear that you found the logging statements. You can check in Yarn’s web 
interface whether there are still occupied containers. Alternatively you can go 
to the different machines and run jps which lists you the running Java 
processes. If you see an ApplicationMaster or YarnTaskManagerRunner process, 
then there is still a container running with Flink on this machine. I hope this 
helps you.

Cheers,
Till

​

On Mon, Jan 11, 2016 at 9:37 AM, Ana M. Martinez 
mailto:a...@cs.aau.dk>> wrote:
Hi Till,

Thanks for that! I can see the "Logger in LineSplitter.flatMap” output if I 
retrieve the task manager logs manually (under 
/var/log/hadoop-yarn/containers/application_X/…). However that solution is not 
ideal when for instance I am using 32 machines for my mapReduce operations.

I would like to know why Yarn’s log aggregation is not working. Can you tell me 
how to check if there are some Yarn containers running after the Flink job has 
finished? I have tried:
hadoop job -list
but I cannot see any jobs there, although I am not sure that it means that 
there are not containers running...

Thanks,
Ana

On 08 Jan 2016, at 16:24, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:


You’re right that the log statements of the LineSplitter are in the logs of the 
cluster nodes, because that’s where the LineSplitter code is executed. In 
contrast, you create a TestClass on the client when you submit the program. 
Therefore, you see the logging statement “Logger in TestClass” on the command 
line or in the cli log file.

So I would assume that the problem is Yarn’s log aggregation. Either your 
configuration is not correct or there are still some Yarn containers running 
after the Flink job has finished. Yarn will only show you the logs after all 
containers are terminated. Maybe you could check that. Alternatively, you can 
try to retrieve the taskmanager logs manually by going to the machine where 
your yarn container was executed. Then under hadoop/logs/userlogs you should 
find somewhere the logs.

Cheers,
Till

​

On Fri, Jan 8, 2016 at 3:50 PM, Ana M. Martinez 
mailto:a...@cs.aau.dk>> wrote:
Thanks for the tip Robert! It was a good idea to rule out other possible 
causes, but I am afraid that is not the problem. If we stick to the 
WordCountExample (for simplicity), the Exception is thrown if placed into the 
flatMap function.

I am going to try to re-write my problem and all the settings below:

When I try to aggregate all logs:
 $yarn logs -applicationId application_1452250761414_0005

the following message is retrieved:
16/01/08 13:32:37 INFO client.RMProxy: Connecting to ResourceManager at 
ip-172-31-33-221.us<http://ip-172-31-33-221.us/>-west-2.compute.internal/172.31.33.221:8032<http://172.31.33.221:8032/>
/var/log/hadoop-yarn/apps/hadoop/logs/application_1452250761414_0005does not 
exist.
Log aggregation has not completed or is not enabled.

(Tried the same command a few minutes later and got the same message, so might 
it be that log aggregation is not properly enabled??)

I am going to carefully enumerate all the steps I have followed (and settings) 
to see if someone can identify why the Logger messages from CORE nodes (in an 
Amazon cluster) are not shown.

1) Enable yarn.log-aggregation-enable property to true in 
/etc/a

Could not upload the jar files to the job manager IOException

2016-01-20 Thread Ana M. Martinez
Hi all,

I am running some experiments with flink in an Amazon cluster and every now and 
then (it seems to appear at random) I get the following IOException:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Could not upload the jar files to the job manager.

Sometimes when it fails, I just try to run it again immediately afterwords and 
it works fine. Any idea on why that might be happening?

Thanks,
Ana

Re: Could not upload the jar files to the job manager IOException

2016-01-21 Thread Ana M. Martinez
 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: PUT operation failed: Connection reset
at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:465)
at org.apache.flink.runtime.blob.BlobClient.put(BlobClient.java:327)
at 
org.apache.flink.runtime.jobgraph.JobGraph.uploadRequiredJarFiles(JobGraph.java:525)
at org.apache.flink.runtime.client.JobClient.uploadJarFiles(JobClient.java:292)
at 
org.apache.flink.runtime.client.JobClientActor$2.call(JobClientActor.java:332)
... 10 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at org.apache.flink.runtime.blob.BlobUtils.writeLength(BlobUtils.java:262)
at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:451)
... 14 more

That is, it finds the jar files well until almost the end of the execution. 
Actually, if I run it again it may or may not work.

Ana

On 20 Jan 2016, at 15:24, Robert Metzger 
mailto:rmetz...@apache.org>> wrote:

Hi,

can you check the log file of the JobManager you're trying to submit the job to?
Maybe there you can find helpful information why it failed.

On Wed, Jan 20, 2016 at 3:23 PM, Ana M. Martinez 
mailto:a...@cs.aau.dk>> wrote:
Hi all,

I am running some experiments with flink in an Amazon cluster and every now and 
then (it seems to appear at random) I get the following IOException:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Could not upload the jar files to the job manager.

Sometimes when it fails, I just try to run it again immediately afterwords and 
it works fine. Any idea on why that might be happening?

Thanks,
Ana





Use jvm to run flink on single-node machine with many cores

2016-02-21 Thread Ana M. Martinez
Hi all,

I am trying to run a program using the flink java library with 
ExecutionEnvironment.getExecutionEnvironment() from the command line using java 
-jar.

If I run the code in my machine (with four cores) or in a multi-node cluster 
(using yarn) the program runs normally, but if I want to run it on a machine 
with a single node and 32 cores using java -jar I get the following error:

02/21/2016 13:33:09 MapPartition (MapPartition at 
toBatches(ConversionToBatches.java:55))(29/32) switched to FAILED
java.io.IOException: Insufficient number of network buffers: required 1, but 
only 0 available. The total number of network buffers is currently set to 2048. 
You can increase this number by setting the configuration key 
'taskmanager.network.numberOfBuffers'.
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:325)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:488)
at java.lang.Thread.run(Thread.java:745)

In this case (java -jar), I don’t know if or how I can increase the number of 
network buffers. Is there a way to do it without having to use yarn (as I don’t 
have hadoop installed)?

Thanks,
Ana



Re: Use jvm to run flink on single-node machine with many cores

2016-02-23 Thread Ana M. Martinez
Hi all,

Thank you very much for your help. It worked perfectly like this:


Configuration conf = new Configuration();
conf.setInteger("taskmanager.network.numberOfBuffers", 16000);
conf.setInteger("taskmanager.numberOfTaskSlots”,32);
final ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(conf);

env.setParallelism(32);

I believe that setting taskmanager.numberOfTaskSlots is not necessary, but 
setParallelism is, as by default 1 was taken.

Best regards,
Ana

On 22 Feb 2016, at 10:37, Ufuk Celebi mailto:u...@apache.org>> 
wrote:

Note that the method to call in the example should be
`conf.setInteger` and the second argument not a String but an int.

On Sun, Feb 21, 2016 at 1:41 PM, Márton Balassi
mailto:balassi.mar...@gmail.com>> wrote:
Dear Ana,

If you are using a single machine with multiple cores, but need convenient
access to the configuration I would personally recommend using the local
cluster option in the flink distribution. [1] If you want to avoid having a
flink distro on the machine, then Robert's solution is the way to go.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/setup/local_setup.html

On Sun, Feb 21, 2016 at 1:34 PM, Ana M. Martinez  wrote:

Hi all,

I am trying to run a program using the flink java library with
ExecutionEnvironment.getExecutionEnvironment() from the command line using
java -jar.

If I run the code in my machine (with four cores) or in a multi-node
cluster (using yarn) the program runs normally, but if I want to run it on a
machine with a single node and 32 cores using java -jar I get the following
error:

02/21/2016 13:33:09 MapPartition (MapPartition at
toBatches(ConversionToBatches.java:55))(29/32) switched to FAILED
java.io.IOException: Insufficient number of network buffers: required 1,
but only 0 available. The total number of network buffers is currently set
to 2048. You can increase this number by setting the configuration key
'taskmanager.network.numberOfBuffers'.
at
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
at
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:325)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:488)
at java.lang.Thread.run(Thread.java:745)

In this case (java -jar), I don’t know if or how I can increase the number
of network buffers. Is there a way to do it without having to use yarn (as I
don’t have hadoop installed)?

Thanks,
Ana





Create generic DeserializationSchema (Scala)

2023-03-06 Thread Ana Gómez González
Hello!

First time emailing one doubt to this mailing list, hope I'm not messing
anything up.
I'm not fully sure if what I want to do it's conceptually correct, so pls
let me know.

I want to create a generic class that extends a DeserializationSchema. I
want an easy way of creating different deserialization schemas for my
rabbitMQ sources from JSON to scala case classes.

My first approach looks like this:

import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}

class GenericJsonSchema[T] extends DeserializationSchema[T] {

private val typeInformation: TypeInformation[T] = TypeInformation.of(new
TypeHint[T] {})
private val objectMapper: JsonMapper = JsonMapper.builder()
.addModule(DefaultScalaModule)
.build()

@throws(classOf[IOException])
def deserialize(message: Array[Byte]): T = objectMapper.readValue(message,
typeInformation.getTypeClass)

def isEndOfStream(nextElement: T): Boolean = false

def getProducedType: TypeInformation[T] = typeInformation
}


When running I obtain:


*Exception in thread "main" org.apache.flink.util.FlinkRuntimeException:
The TypeHint is using a generic variable.This is not supported, generic
types must be fully specified for the TypeHint.*

I've read and tried to understand all the problems when using generic types
and TypeInformation class, but I don't get the correct use or if it can be
used for my purpose.


Thanks a lot in advance


*Ana Gómez González*
<http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/>


Re: Create generic DeserializationSchema (Scala)

2023-03-08 Thread Ana Gómez González
Thank you Alexey!
It worked perfectly. I was missing the ClassTag correct use.



*Ana Gómez González*

<http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/>


El lun, 6 mar 2023 a las 23:34, Alexey Novakov ()
escribió:

> Hi Ana,
>
> I think you will need to deal with ClassTag to keep all the code generic.
> I've found such example which should help:
>
>
> https://github.com/amzn/milan/blob/7dfa29b434ced7eef286ea34c5085c10c1b787b6/milan/milan-compilers/milan-flink-compiler/src/main/scala/com/amazon/milan/compiler/flink/serialization/JsonDeserializationSchema.scala
>
> object JsonDeserializationSchema {
>   private val objectMapper = 
> JsonMapper.builder().addModule(DefaultScalaModule).build()}
>
>
> class JsonDeserializationSchema[T: ClassTag] extends DeserializationSchema
> [T] {
> override def deserialize(bytes: Array[Byte]): T =
> JsonDeserializationSchema.objectMapper.readValue[T](bytes, classTag[T].
> runtimeClass.asInstanceOf[Class[T]]) override def getProducedType:
> TypeInformation[T] = TypeExtractor.getForClass(classTag[T].runtimeClass.
> asInstanceOf[Class[T]])
>
>   ...
>
>
> }
>
> ---
>
> Alexey
>
> On Mon, Mar 6, 2023 at 9:58 PM Ana Gómez González 
> wrote:
>
>>
>> Hello!
>>
>> First time emailing one doubt to this mailing list, hope I'm not messing
>> anything up.
>> I'm not fully sure if what I want to do it's conceptually correct, so pls
>> let me know.
>>
>> I want to create a generic class that extends a DeserializationSchema. I
>> want an easy way of creating different deserialization schemas for my
>> rabbitMQ sources from JSON to scala case classes.
>>
>> My first approach looks like this:
>>
>> import com.fasterxml.jackson.databind.json.JsonMapper
>> import com.fasterxml.jackson.module.scala.DefaultScalaModule
>> import org.apache.flink.api.common.serialization.DeserializationSchema
>> import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
>>
>> class GenericJsonSchema[T] extends DeserializationSchema[T] {
>>
>> private val typeInformation: TypeInformation[T] = TypeInformation.of(new
>> TypeHint[T] {})
>> private val objectMapper: JsonMapper = JsonMapper.builder()
>> .addModule(DefaultScalaModule)
>> .build()
>>
>> @throws(classOf[IOException])
>> def deserialize(message: Array[Byte]): T = objectMapper.readValue(message,
>> typeInformation.getTypeClass)
>>
>> def isEndOfStream(nextElement: T): Boolean = false
>>
>> def getProducedType: TypeInformation[T] = typeInformation
>> }
>>
>>
>> When running I obtain:
>>
>>
>> *Exception in thread "main" org.apache.flink.util.FlinkRuntimeException:
>> The TypeHint is using a generic variable.This is not supported, generic
>> types must be fully specified for the TypeHint.*
>>
>> I've read and tried to understand all the problems when using generic
>> types and TypeInformation class, but I don't get the correct use or if it
>> can be used for my purpose.
>>
>>
>> Thanks a lot in advance
>>
>>
>> *Ana Gómez González*
>> <http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/>
>>
>


Debug CEP Patterns

2023-04-18 Thread Ana Gómez González
Hello!

What's the best way of debugging a CEP pattern/stream?
Basically, I have a Flink (Scala) program that consumes events from
RabbitMQ. The source is working ok because I can see the inputStream
printed. Then, I defined one pattern, created a patternStream, and finally
an output stream with a created PatternProcessFunction.

I want to make sure my pattern is defined correctly and the same with my
process function, because so far it seems I'm not getting any element in my
output stream and I'm manually inserting events in the source that might
cause a match. Is there a way I can debug this using IntelliJ (I set some
breakpoints but ofc doesn't work) or with any other method? I used also
some classical System.out.println with no success.
Can some tests be written and later performed to check the expected
behavior?

I'm leaving here most of my code in case you detect some structural or
conceptual error (for example, before this version I was not setting the
time characteristics for the stream so it didn't work...)

package org.angoglez
import dto.events.{HourlyReading, OutputEvent}
import dto.schemas.GenericJsonDeserializationSchema
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner,
WatermarkStrategy}
import org.apache.flink.api.scala._
import org.apache.flink.cep.scala.CEP
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.rabbitmq.common.
RMQConnectionConfig
import patterns.WaterPatterns
import patterns.functions.HourlyReadingProcessFunction
import source.RabbitMQSource

import java.time.{Duration, Instant, ZoneId, ZoneOffset}

object MainRunner {
def main(args: Array[String]): Unit = {

//ENVIRONMENT, CONFIG, RABBIT SOURCE, CREATION OF DATASTREAM
val conf = new Configuration()
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
env.enableCheckpointing(100)

val connectionConfig: RMQConnectionConfig = new RMQConnectionConfig.Builder
()
.setHost("localhost")
.setPort(5672)
.setVirtualHost("")
.setUserName("")
.setPassword("")
.build

type eventType = HourlyReading
//Not gonna include the DeserializationSchema here, it works properly
'cause I can see it printed.
val deserializationSchema = new GenericJsonDeserializationSchema[eventType]

val rabbitMQSource = new RabbitMQSource[eventType](connectionConfig,
"tests-flink", deserializationSchema)

val serializableTimestampAssigner = new
SerializableTimestampAssigner[eventType]
{
def extractTimestamp(element: eventType, recordTimestamp: Long): Long =
element.obtainTimestamp.toInstant(ZoneId.systemDefault.getRules.getOffset(
Instant.now())).toEpochMilli
}

val inputStream: DataStream[eventType] = env
.addSource(rabbitMQSource.obtainRabbitSource) // same as above, the
RMQSource works well, not added here.
.assignTimestampsAndWatermarks {
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(serializableTimestampAssigner)
}


inputStream.print() // printed ok

// PATTERN DEFINITION AND

val pattern =
Pattern
.begin[HourlyReading]("a1-nighttimeflowfraud")
.where(event => FunctionUtils.isBetweenHoursRange(event.dateTime, 1, 6))
.next("a2-nighttimeflowfraud")
.where(event => FunctionUtils.isBetweenHoursRange(event.dateTime, 1, 6))
.where { (event: HourlyReading, context: Context[HourlyReading]) =>
val previousPattern = context.getEventsForPattern("a1-nighttimeflowfraud"
).toList.head
(previousPattern.serialNumber == event.serialNumber) &&
(previousPattern.unit == "M3" && event.unit == "M3") && (event.volume -
previousPattern.volume < 2 ||
event.volume - previousPattern.volume > 3)
}
val patternStream = CEP.pattern(inputStream, pattern)

val outputStream: DataStream[OutputEvent] = patternStream.process(new
HourlyReadingProcessFunction) //below

outputStream.print() // nothing here

env.execute
}
}


 PATTERN PROCESS FUNCTION CLASS
///

class HourlyReadingProcessFunction extends PatternProcessFunction[
HourlyReading, OutputEvent] {
def processMatch(
patterns: util.Map[String, util.List[HourlyReading]],
ctx: PatternProcessFunction.Context,
out: Collector[OutputEvent]
): Unit = {

System.out.println("PROCESSING MATCH")

val pattern1: HourlyReading = patterns.get("a1-nighttimeflowfraud").get(0)
val pattern2: HourlyReading = patterns.get("a2-nighttimeflowfraud").get(0)

System.out.println(pattern1)
System.out.println(pattern2)
//case class OutputEvent(str: String)
out.collect(OutputEvent(s"Event observed: ${pattern1.dateTime.toString} - ${
pattern2.dateTime.toString}"))
}
}





Thanks a lot in advance!!


*Ana Gómez González*

<http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/>


Re: Debug CEP Patterns

2023-04-18 Thread Ana Gómez González
Here's a link for the previous code in a gist so you don't struggle with
the format. Sorry.

https://gist.github.com/angoglez/d9eb6e12f259aba306387b5c23488fb4

>


Re: Debug CEP Patterns

2023-04-20 Thread Ana Gómez González
Thanks Biao, your insights were very helpful for me!
I finally could debug properly, a couple of things were not correct there,
in fact.
Also, I think I'm not modeling properly the behaviour of the pattern so I
definitely going to investigate better this.

Regards




*Ana Gómez González*

<http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/>


El mié, 19 abr 2023 a las 8:26, Biao Geng () escribió:

> Hi Ana,
> Thanks for the codes. I just want to share my own experience when
> debugging CEP patterns:
> 1. It should work when adding breakpoints in IntelliJ. But you should add
> the breakpoint on the filter function like the call of isBetweenHoursRange
> or the line65 in your gist. The reason is that the pattern object will be
> built once, when no events entered as the env.execute is not called. But
> the filter method in condition object of the pattern will be executed to
> detect if the NFA should advance to next phase.
> 2. You can add some System.out.println in the condition so you may know
> which condition is matched and which condition is not matched. It is
> helpful in most cases.
>
> For test writing, you can reference to codes
> of org.apache.flink.cep.CEPITCase.
> My final thought is that your line63 and 64 use 2  where clauses so these
> 2 condition will be joined with "and"(i.e. for a single event, it should
> fulfill both of the 2 conditions). You can check if that is what you really
> want.
>
> Best,
> Biao Geng
>
> Ana Gómez González  于2023年4月19日周三 05:09写道:
>
>> Here's a link for the previous code in a gist so you don't struggle with
>> the format. Sorry.
>>
>> https://gist.github.com/angoglez/d9eb6e12f259aba306387b5c23488fb4
>>
>>>


CEP - Interval between patterns

2023-07-09 Thread Ana Gómez González
Hello everyone!

I need some help trying to implement a CEP Pattern.

This pattern detects if there are no messages from a device for two
consecutive days.
For this purpose, it checks that after receiving a measurement from a
device, no further measurement is received from the same device for 2 days.

This is its signature in Esper SQL:

insert into CommunicationError
> select a1.serialNumber as serialNumber
> from pattern [((every a1 = Measure) -> (timer:interval(2 days) and not a2
> = Measure(a2.serialNumber = a1.serialNumber)))]



The initial wanted behavior is: receive a1, then wait for 2 days, evaluate
if not a2 --> then the pattern is triggered.
Is there a way to code this using the CEP library? Or at least, if not with
this exact behavior, something similar?

I've been testing with a keyed stream by serialNumber and this pattern but
it doesn't work as expected

val getPattern: Pattern[Measure, Measure] =
> Pattern
> .begin[Measure]("a1-communication-error")
> .within(Time.minutes(3)) //this has been reduced for testing purposes
> .notFollowedBy("a2-communication-error")



Thank you so much always for your time and insights



*Ana Gómez González*

<http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/>