Re: Flink/Kafka POC performance issue

2018-04-16 Thread Niclas Hedhman
Have you checked memory usage? It could be as simple as either having
memory leaks, or aggregating more than you think (sometimes not obvious how
much is kept around in memory for longer than one first thinks). If
possible, connect FlightRecorder or similar tool and keep an eye on memory.
Additionally, I don't have AWS experience to talk of, but IF AWS swaps RAM
to disk like regular Linux, then that might be triggered if your JVM heap
is bigger than can be handled within the available RAM.

On Tue, Apr 17, 2018 at 9:26 AM, TechnoMage  wrote:

> I am doing a short Proof of Concept for using Flink and Kafka in our
> product.  On my laptop I can process 10M inputs in about 90 min.  On 2
> different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and
> ssd storage) I see the process hit a wall around 50min into the test and
> short of 7M events processed.  This is running zookeeper, kafka broker,
> flink all on the same server in all cases.  My goal is to measure single
> node vs. multi-node and test horizontal scalability, but I would like to
> figure out why hit hits a wall first.  I have the task maanger configured
> with 6 slots and the job has 5 parallelism.  The laptop has 8 threads, and
> the EC2 instances have 4 threads. On smaller data sets and in the begining
> of each test the EC2 instances outpace the laptop.  I will try again with
> an m5.2xlarge which has 8 threads and 32GB ram to see if that works better
> for this workload.  Any pointers or ways to get metrics that would help
> diagnose this would be appreciated.
>
> Michael
>
>


-- 
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java


Re: Flink TaskManager and JobManager internals

2018-03-28 Thread Niclas Hedhman
Thanks for trying to help, really appreciate it, and I am sorry that I was
not clear enough.

I am using Apache Polygene, and its application modeling is very nice for
what we do. What Polygene is exactly, is not really important, other than a
lot of app code exist at my end and that Polygene generates classes on the
fly, using custom classloaders. Think; AspectJ and similar, with runtime
weaving.

These last few weeks with Flink has been a bit scary, since I think it is
the first time in my 35 year career where I don't understand, can't figure
out and can't find answers, to what is actually going on under the hood,
even though I am able to work as a plain user, as prescribed, just fine. I
can guess, but that is going to take longer time to work out, than getting
pointers to those answers from the horses mouth.

What I don't fully understand in Flink (Streaming) is;

1. I define a main() method, put everything into a JAR file, and it
"somehow" gets deployed on the nodes in my cluster. Will each node receive
the JAR file and a JVM is spun up for that main(), or does Flink keep it
in-JVM and some classloader isolation to protect jobs from each other? The
dataArtisan presentation given, on slide 17 shows an ambiguous (to me)
layout which could be interpreted as my Flink app (topology I prefer to
call it) is executed on a seprate JVM...

2. But I have also seen that it is possible to "scale out" the processing
within a topology, which would suggest that additional hosts are used. If
so, how does that relate to the above deployment on, say 3 hosts? Is that
scale-out only within that JVM 9in which case I am good and don't need to
worry), or is that somehow offloaded to other servers in the cluster, and
if so how is that deployed?

3. "Debugging Classloading" is IMVHO a little bit "short" on the details,
and a complete overview of what classloaders Flink sets up (if any) and
when/how it does it, is basically what I need to make sure I set all of
that up correctly in my own case.

4. All Functions (et al) in Flink seems to require "java.io.Serializable",
which to me is a big flag waved screaming "problem for me". Polygene has a
state model that is not compatible with java.io.Serializable, and I have
been looking for explanations on why the Functions are serializable, but
since the data flow is dominating Flink Streaming there are LOTS of links
talking about data serialization, which is not a problem on my end.

5. YARN/Mesos was only mentioned to point out that complex deployments are
possible, with hooks, so from my PoV, worst-case scenario is to do my own
deployment system that doesn't rely on some of the fundamentals in Flink. I
am not to deploy on Mesos nor Yarn.


Once again, thanks a lot for any pointers or info that can be given.

Cheers
Niclas


On Wed, Mar 28, 2018 at 8:17 PM, kedar mhaswade 
wrote:

>
>
> On Wed, Mar 28, 2018 at 3:14 AM, Niclas Hedhman  wrote:
>
>> Hi,
>>
>> is there some document (or presentation) that explains the internals of
>> how a Job gets deployed on to the cluster? Communications, Classloading and
>> Serialization (if any) are the key points here I think.
>>
>
> I don't know of any specific presentations, but data artisans provide
> http://training.data-artisans.com/system-overview.html which are pretty
> good.
> The Flink documentation is comprehensive.
> Class-loading: https://ci.apache.org/projects/flink/
> flink-docs-master/monitoring/debugging_classloading.html
> State serialization: https://ci.apache.org/projects/flink/
> flink-docs-master/dev/stream/state/custom_serialization.html
>
>>
>> I suspect that my application modeling framework is incompatible with the
>> standard Flink mechanism, and I would like to learn how much effort there
>> is to make my own mechanism (assuming it is possible, since Yarn and Mesos
>> are in similar situation)
>>
>
> Don't know what you mean by application "modeling" framework, but if you
> mean that you have a Flink app (batch or streaming) that you'd want to
> deploy to YARN (or Mesos, which is similar), then the flow appears to be
> 1- Create a "Flink Cluster" (also called a YARN session) when a user does
> "bin/yarn-session.sh " and then
> 2- Run the app when the user does "bin/flink run  ".
>
> It's the user's responsibility to shut down the cluster (YARN session) by
> sending a "stop" command to the YARN session created in 1). The code
> appears to be in classes like org.apache.flink.yarn.cli.FlinkYarnSessionCli 
> (manage
> the YARN session) and org.apache.flink.client.CliFrontend (submit a Flink
> app to the YARN session).
>
> Regards,
> Kedar
>
>
>>
>> Thanks in Advance
>> --
>> Niclas Hedhman, Software Developer
>> http://zest.apache.org - New Energy for Java
>>
>
>


-- 
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java


Flink TaskManager and JobManager internals

2018-03-28 Thread Niclas Hedhman
Hi,

is there some document (or presentation) that explains the internals of how
a Job gets deployed on to the cluster? Communications, Classloading and
Serialization (if any) are the key points here I think.

I suspect that my application modeling framework is incompatible with the
standard Flink mechanism, and I would like to learn how much effort there
is to make my own mechanism (assuming it is possible, since Yarn and Mesos
are in similar situation)


Thanks in Advance
-- 
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java


Re: Is Flink easy to deploy ?

2018-02-23 Thread Niclas Hedhman
I think you simply missing a bunch of the Flink artifacts.

Flink is broken into dozens of pieces, and you need to select from a large
set of artifacts what you need to depend on.

Typically, there is one Flink artifact per "extension".
I ended up with
   flink-core
   flink-core
   flink-connector-cassandra_2.11
   flink-connector-kafka_2.11
   flink-queryable-state-runtime_2.11
   flink-streaming-java_2.11
   flink-streaming-scala_2.11

With transitive dependencies enabled, meaning whatever Flink depends on is
also included

Cheers

On Sat, Feb 24, 2018 at 3:22 AM, Esa Heikkinen 
wrote:

>
> Yes i have looked. For example, if i want to compile and run
> StreamTableExample.scala from:
>
> https://github.com/apache/flink/blob/master/flink-
> examples/flink-examples-table/src/main/scala/org/apache/
> flink/table/examples/scala/StreamTableExample.scala
>
> I have taken all examples (and also latest Flink at same time) to my Linux
> from git.
>
> Where directory should i be in for compiling and running
> StreamTableExample in command line ? flink-examples-table ?
>
> What is the command for compiling ? mvn clean install -Pbuild-jar ?
>
> What is the command running the StreamTableExample ? / bin>/flink run -c org.apache.flink.table.examples.scala.StreamTableExample
> target/flink-examples-table_2.1-1.5-SNAPSHOT.jar ?
>
> This does not work because of error: java.lang.NoClassDefFoundError:...
>
> BR Esa
>
> Fabian Hueske kirjoitti 23.2.2018 klo 15:07:
>
> Have you had a look at the examples? [1]
> They can be run out of the IDE.
>
> Fabian
>
> [1] https://github.com/apache/flink/tree/master/flink-
> examples/flink-examples-streaming/src/main/scala/org/
> apache/flink/streaming/scala/examples
>
> 2018-02-23 13:30 GMT+01:00 Esa Heikkinen :
>
>> I have lot of difficulties to deploy Flink. That is maybe because I am
>> new with Flink and its (Java and Maven) development environment, but I
>> would hear the opinions of others. I would like to use Scala.
>>
>>
>>
>> There are many examples, but often there are missing “imports” and
>> settings in pom.xml. It seem to be very hard to job to find correct ones.
>> Maybe use of IDE (IntelliJ IDEA) is almost mandatory and it helps to find
>> “imports”, but it does not find all of them.
>>
>>
>>
>> Generally you have to do and study a lot of basic work before you get
>> into the actual thing ?
>>
>>
>>
>> If there is a ready example (with source code) that is enough close to
>> what you want, it is much easier to deploy. But if not, it can be a
>> surprisingly difficult and time-consuming task. Because the
>> documentation seem to be partially incomplete, it is often necessary to
>> “google” and query the mailing list.
>>
>>
>>
>> Or have I misunderstand something or I can not use Flink correctly (yet) ?
>>
>>
>>
>> Features of Flink are so good that I would want to learn to use it.
>>
>>
>>
>> Best Regards
>>
>> Esa
>>
>
>
>


-- 
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java


Re: Discarding bad data in Stream

2018-02-19 Thread Niclas Hedhman
Thanks Fabian,

I have seen Side Outputs and OutputTags but not fully understood the
mechanics yet. In my case, I don't need to keep bad records... And I think
I will end up with flatMap() after all, it just becomes a internal
documentation issue to provide relevant information...

Thanks for your response.
Niclas

On Mon, Feb 19, 2018 at 8:46 PM, Fabian Hueske  wrote:

> Hi Niclas,
>
> I'd either add a Filter to directly discard bad records. That should make
> the behavior explicit.
> If you need to do complex transformations that you don't want to do twice,
> the FlatMap approach would be the most efficient.
> If you'd like to keep the bad records, you can implement a ProcessFunction
> and add a side output [1] that collects bad records.
>
> Hope this helps,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/stream/side_output.html
>
> 2018-02-19 10:29 GMT+01:00 Niclas Hedhman :
>
>> Hi again,
>>
>> something that I don't find (easily) in the documentation is what the
>> recommended method is to discard data from the stream.
>>
>> On one hand, I could always use flatMap(), even if it is "per message"
>> since that allows me to return zero or one objects.
>>
>> DataStream stream =
>> env.addSource( source )
>>.flatMap( new MyFunction() )
>>
>>
>> But that seems a bit misleading, as the casual observer will get the idea
>> that MyFunction 'branches' out, but it doesn't.
>>
>> The other "obvious" choice is to return null and follow with a filter...
>>
>> DataStream stream =
>> env.addSource( source )
>>.map( new MyFunction() )
>>.filter( Objects::nonNull )
>>
>> BUT, that doesn't work with Java 8 method references like above, so I
>> have to create my own filter to get the type information correct to Flink;
>>
>> DataStream stream =
>> env.addSource( source )
>>.map( new MyFunction() )
>>.filter( new DiscardNullFilter<>() )
>>
>>
>> And in my opinion, that ends up looking ugly as the streams/pipeline (not
>> used to terminology yet) quickly have many transformations and branches,
>> and having a null check after each seems to put the burden of knowledge in
>> the wrong spot ("Can this function return null?")
>>
>> Throwing an exception is shutting down the entire stream, which seems
>> overly aggressive for many data related discards.
>>
>> Any other choices?
>>
>> Cheers
>> --
>> Niclas Hedhman, Software Developer
>> http://zest.apache.org - New Energy for Java
>>
>
>


-- 
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java


Re: How to find correct "imports"

2018-02-19 Thread Niclas Hedhman
It is called "declared dependencies", and Flink has a huge number of
artifacts, and they have also changed name over time. But Maven Central
provides a search facility.

Try http://search.maven.org/#search%7Cga%7C5%7Cg%3Aorg.
apache.flink%20AND%20v%3A1.4.0

And it will give you all artifacts from Flink 1.4.0

On Mon, Feb 19, 2018 at 4:56 PM, Esa Heikkinen  wrote:

> Hi
>
>
>
> I am quite new with Flink and Scala. I have had a bit of trouble finding
> corrects “imports”.
>
> What would be the best way to find them ?
>
>
>
> For example the imports for StreamTableEnvironment and CsvTableSource.
>
>
>
> And how do I know if I should put something pom.xml ?
>
>
>
> Esa
>



-- 
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java


Discarding bad data in Stream

2018-02-19 Thread Niclas Hedhman
Hi again,

something that I don't find (easily) in the documentation is what the
recommended method is to discard data from the stream.

On one hand, I could always use flatMap(), even if it is "per message"
since that allows me to return zero or one objects.

DataStream stream =
env.addSource( source )
   .flatMap( new MyFunction() )


But that seems a bit misleading, as the casual observer will get the idea
that MyFunction 'branches' out, but it doesn't.

The other "obvious" choice is to return null and follow with a filter...

DataStream stream =
env.addSource( source )
   .map( new MyFunction() )
   .filter( Objects::nonNull )

BUT, that doesn't work with Java 8 method references like above, so I have
to create my own filter to get the type information correct to Flink;

DataStream stream =
env.addSource( source )
   .map( new MyFunction() )
   .filter( new DiscardNullFilter<>() )


And in my opinion, that ends up looking ugly as the streams/pipeline (not
used to terminology yet) quickly have many transformations and branches,
and having a null check after each seems to put the burden of knowledge in
the wrong spot ("Can this function return null?")

Throwing an exception is shutting down the entire stream, which seems
overly aggressive for many data related discards.

Any other choices?

Cheers
-- 
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java


Re: Only a single message processed

2018-02-19 Thread Niclas Hedhman
m )
 .setHost( cli.primaryCassandraHost(),
cli.primaryCassandraPort() )
 .setQuery( String.format( DELETE_SCHEDULE,
cli.cassandraKeyspace ) )
 .build();
}
catch( Throwable e )
{
String message = "Unable to start Scheduling Admin";
LOG.error( message );
throw new RuntimeException( message, e );
}
}


private static class GetPollDeclaration
implements MapFunction
{
private static final Logger LOG = LoggerFactory.getLogger(
GetPollDeclaration.class );

@Override
public PollDeclaration map( AdminCommand command )
throws Exception
{
try
{
if( command == null )
{
return null;
}
return (PollDeclaration) command.value();
}
catch( Throwable e )
{
LOG.error( "Unable to cast command data to PollDeclaration", e );
return null;
}
}
}


private static class PollDeclarationToTuple3Map
implements MapFunction,
String, String>>
{
@Override
public Tuple3, String, String> map( PollDeclaration decl )
throws Exception
{
try
{
if( decl == null )
{
return null;
}
return new Tuple3<>( singletonList(
mapper.writeValueAsString( decl ) ), decl.zoneId + ":" +
decl.schedule, decl.url );
}
catch( Throwable e )
{
LOG.error( "Unable to cast command data to PollDeclaration", e );
return null;
}
}
}

Flink Dependencies;

flink : [
[group: "org.apache.flink", name: "flink-core", version: flinkVersion],
[group: "org.apache.flink", name: "flink-java", version: flinkVersion],
[group: "org.apache.flink", name:
"flink-connector-cassandra_2.11", version: flinkVersion],
[group: "org.apache.flink", name:
"flink-connector-kafka-0.11_2.11", version: flinkVersion],
[group: "org.apache.flink", name:
"flink-queryable-state-runtime_2.11", version: flinkVersion],
[group: "org.apache.flink", name: "flink-streaming-java_2.11",
version: flinkVersion],
[group: "org.apache.flink", name:
"flink-streaming-scala_2.11", version: flinkVersion]
],





On Sun, Feb 18, 2018 at 8:11 PM, Xingcan Cui  wrote:

> Hi Niclas,
>
> About the second point you mentioned, was the processed message a random
> one or a fixed one?
>
> The default startup mode for FlinkKafkaConsumer is
> StartupMode.GROUP_OFFSETS, maybe you could try StartupMode.EARLIST while
> debugging. Also, before that, you may try fetching the messages with the
> Kafka console consumer tool to see whether they can be consumed completely.
>
> Besides, I wonder if you could provide the code for you Flink pipeline.
> That’ll be helpful.
>
> Best,
> Xingcan
>
>
>
> On 18 Feb 2018, at 7:52 PM, Niclas Hedhman  wrote:
>
>
> So, the producer is run (at the moment) manually (command-line) one
> message at a time.
> Kafka's tooling (different consumer group) shows that a message is added
> each time.
>
> Since my last post, I have also added a UUID as the key, and that didn't
> make a difference, so you are likely correct about de-dup.
>
>
> There is only a single partition on the topic, so it shouldn't be a
> partitioning issue.
>
> I also noticed;
> 1. Sending a message while consumer topology is running, after the first
> message, then that message will be processed after a restart.
>
> 2. Sending many messages, while consumer is running, and then doing many
> restarts will only process a single of those. No idea what happens to the
> others.
>
> I am utterly confused.
>
> And digging in the internals are not for the faint-hearted, but the
> kafka.poll() returns frequently with empty records.
>
> Will continue debugging that tomorrow...
>
>
> Niclas
>
> On Feb 18, 2018 18:50, "Fabian Hueske"  wrote:
>
>> Hi Niclas,
>>
>> Flink's Kafka consumer should not apply any deduplication. AFAIK, such a
>> "feature" is not implemented.
>> Do you produce into the topic that you want to read or is the data in the
>> topic static?
>> If you do not produce in the topic while the consuming application is
>> running, this might be an issue with the start position of the consumer
>> [1].
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>
>> 2018-02-18 8:14

Re: Need to understand the execution model of the Flink

2018-02-18 Thread Niclas Hedhman
Do you really need the large single table created in step 2?

If not, what you typically do is that the Csv source first do the common
transformations. Then depending on whether the 10 outputs have different
processing paths or not, you either do a split() to do individual
processing depending on some criteria, or you just have the sink put each
record in separate tables.
You have full control, at each step along the transformation path whether
it can be parallelized or not, and if there are no sequential constraints
on your model, then you can easily fill all cores on all hosts quite easily.

Even if you need the step 2 table, I would still just treat that as a
split(), a branch ending in a Sink that does the storage there. No need to
read records from file over and over again, nor to store them first in step
2 table and read them out again.

Don't ask *me* about what happens in failure scenarios... I have myself not
figured that out yet.

HTH
Niclas

On Mon, Feb 19, 2018 at 3:11 AM, Darshan Singh 
wrote:

> Hi I would like to understand the execution model.
>
> 1. I have a csv files which is say 10 GB.
> 2. I created a table from this file.
>
> 3. Now I have created filtered tables on this say 10 of these.
> 4. Now I created a writetosink for all these 10 filtered tables.
>
> Now my question is that are these 10 filetered tables be written in
> parallel (suppose i have 40 cores and set up parallelism to say 40 as well.
>
> Next question I have is that the table which I created form the csv file
> which is common wont be persisted by flink internally rather for all 10
> filtered tables it will read csv files and then apply the filter and write
> to sink.
>
> I think that for all 10 filtered tables it will read csv again and again
> in this case it will be read 10 times.  Is my understanding correct or I am
> missing something.
>
> What if I step 2 I change table to dataset and back?
>
> Thanks
>



-- 
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java


Re: Only a single message processed

2018-02-18 Thread Niclas Hedhman
So, the producer is run (at the moment) manually (command-line) one message
at a time.
Kafka's tooling (different consumer group) shows that a message is added
each time.

Since my last post, I have also added a UUID as the key, and that didn't
make a difference, so you are likely correct about de-dup.


There is only a single partition on the topic, so it shouldn't be a
partitioning issue.

I also noticed;
1. Sending a message while consumer topology is running, after the first
message, then that message will be processed after a restart.

2. Sending many messages, while consumer is running, and then doing many
restarts will only process a single of those. No idea what happens to the
others.

I am utterly confused.

And digging in the internals are not for the faint-hearted, but the
kafka.poll() returns frequently with empty records.

Will continue debugging that tomorrow...


Niclas

On Feb 18, 2018 18:50, "Fabian Hueske"  wrote:

> Hi Niclas,
>
> Flink's Kafka consumer should not apply any deduplication. AFAIK, such a
> "feature" is not implemented.
> Do you produce into the topic that you want to read or is the data in the
> topic static?
> If you do not produce in the topic while the consuming application is
> running, this might be an issue with the start position of the consumer
> [1].
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/connectors/kafka.html#kafka-consumers-
> start-position-configuration
>
> 2018-02-18 8:14 GMT+01:00 Niclas Hedhman :
>
>> Hi,
>> I am pretty new to Flink, and I like what I see and have started to build
>> my first application using it.
>> I must be missing something very fundamental. I have a
>> FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap
>> functions and terminated with the standard CassandraSink. I have try..catch
>> on all my own maps/filters and the first message in the queue is processed
>> after start-up, but any additional messages are ignore, i.e. not reaching
>> the first map(). Any additional messages are swallowed (i.e. consumed but
>> not forwarded).
>>
>> I suspect that this is some type of de-duplication going on, since the
>> (test) producer of these messages. The producer provide different values on
>> each, but there is no "key" being passed to the KafkaProducer.
>>
>> Is that required? And if so, why? Can I tell Flink or Flink's
>> KafkaConsumer to ingest all messages, and not try to de-duplicate them?
>>
>> Thanks
>>
>> --
>> Niclas Hedhman, Software Developer
>> http://zest.apache.org - New Energy for Java
>>
>
>


Only a single message processed

2018-02-17 Thread Niclas Hedhman
Hi,
I am pretty new to Flink, and I like what I see and have started to build
my first application using it.
I must be missing something very fundamental. I have a
FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap
functions and terminated with the standard CassandraSink. I have try..catch
on all my own maps/filters and the first message in the queue is processed
after start-up, but any additional messages are ignore, i.e. not reaching
the first map(). Any additional messages are swallowed (i.e. consumed but
not forwarded).

I suspect that this is some type of de-duplication going on, since the
(test) producer of these messages. The producer provide different values on
each, but there is no "key" being passed to the KafkaProducer.

Is that required? And if so, why? Can I tell Flink or Flink's KafkaConsumer
to ingest all messages, and not try to de-duplicate them?

Thanks

--
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java