Re: Referencing Global Window across flink jobs

2017-07-09 Thread Konstantin Knauf
Hi Vijay,

thanks for sharing the code. To my knowledge the only way to access the
state of one job in another job right now is Queryable State, which in
this case seems impractical. Why do you want to perform the apply
functions in separate Flink jobs?

In the same job I would just perform all aggregations within one
WindowFunction emitting a Tuple/POJO with all the aggregations. You can
then use a map to project the stream of all aggregations to its
dimensions. This way you only keep the window state once, opposed to
calling WindowedStream::apply multiple times on the same windowed
stream. In case you want to decouple the downstream operations on the
different aggregations from each other, you can still write the
different dimensions of the output of the WindowFunction to different
Kafka Topics and have separate jobs from there on.

Cheers,

Konstantin

On 07.07.2017 12:06, G.S.Vijay Raajaa wrote:
> HI Konstantin,
> 
> Please find a snippet of my code:
> 
>   DataStream < String > stream = env
> 
>.addSource(new FlinkKafkaConsumer08 < > ("data", new
> SimpleStringSchema(), properties));
> 
>   
> 
>   // Create a keyed stream from the kafka data stream
> 
>   KeyedStream, Tuple> pojo = 
> 
>   stream.map(new JsonDeserializer()).
> 
>   keyBy(0);
> 
>   
> 
>   // Create a global window to extend the window throughout the day
> 
>  
> pojo.window(GlobalWindows.create()).trigger(MyTrigger.of(10,4000))*.apply(new
> JsonMerger()).*
> 
> *
> *
> 
> In the above snippet the Global Window keeps on growing and trigger
> fires  the apply function for every addition of a record to the window.
> The final purge happens when the max count is met. Now the idea is I am
> exploring if I could reference the state and trigger of the global
> function across flink jobs and perform apply functions parallely. The
> source for all the flink jobs is the same window of data. The idea is
> that, the parallel flink jobs wont hook up to the stream source but get
> triggered based on the global window state and trigger event. Hope it
> explains the scenario. Please excuse if I am not able to detail the
> nitty gritties to the most granular unit possible.
> 
> Regards,
> 
> Vijay Raajaa GS 
> 
> 
> On Fri, Jul 7, 2017 at 3:17 PM, Konstantin Knauf
> > wrote:
> 
> Hi Vijay,
> 
> can you elaborate a little bit on what you would like to achieve?
> Right now, I am not sure what aspect of the window you want to
> reference (WindowState,Timers, State in the Windowfunction,...).
> 
> Cheers,
> 
> Konstantin
> 
> sent from my phone. Plz excuse brevity and tpyos.
> ---
> Konstantin Knauf *konstantin.kn...@tngtech.com
>  * +49-174-3413182
> 
> TNG Technology Consulting GmbH, Betastr. 13a, 85774
>  Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> 
>  G.S.Vijay Raajaa schrieb 
> 
> 
> HI,
> 
> I have a use case were I need to build a global window with custom
> trigger. I would like to reference this window across my flink jobs.
> Is there a possibility that the global window can be referenced?
> 
> Regards,
> Vijay Raajaa GS 
> 
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: Best practices for assigning operator UIDs

2017-07-09 Thread Gyula Fóra
Hi Jared,

The only thing that matters is that UIDs are unique within one JobGraph.
Its completely fine to use the same uids in two separate jobs.


Beyond this I would go with simple uids that dont contain parts of the
logic, because maybe you want to change the logic (expression, or add new
topics) and still keep the state.

Hope this helps :)
Gyula

On Sun, Jul 9, 2017, 18:36 Jared Stehler <
jared.steh...@intellifylearning.com> wrote:

> I have some confusion around how best to assign UIDs to operators. The
> documentation states simply that they are important, but stops short of
> recommending what if any stateful information should go into the name. For
> example, if the same code is used to create two separate job graphs, should
> the operator UIDs contain information specific to each job instance?
>
> Example: for a kafka source, should the UID contain the subscribed topic
> name / pattern?
>
> Example: if I have a custom mapping function which reshapes a JSON field
> based on an expression, should that expression (or a hash thereof) be
> contained in that operator’s UID?
>
> Basically, is the UID similar to overriding hashcode and equals for POJOs,
> or is it more like overriding serializableVersionUID?
>
>
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703
>
>
>
>


Re: External checkpoints not getting cleaned up/discarded - potentially causing high load

2017-07-09 Thread Jared Stehler
We are using the rocksDB state backend. We had not activated incremental 
checkpointing, but in the course of debugging this, we ended up doing so, and 
also moving back to S3 from EFS as it appeared that EFS was introducing large 
latencies. I will attempt to provide some profiler data as we are able to 
analyze further.

--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703



> On Jul 3, 2017, at 6:02 AM, Stefan Richter  
> wrote:
> 
> Hi,
> 
> I have two quick questions about this problem report:
> 
> 1) Which state backend are you using?
> 2) In case you are using RocksDB, did you also activate incremental 
> checkpointing when moving to Flink 1.3.
> 
> Another thing that could be really helpful, if possible, can you attach a 
> profiler/sampling to your job manager and figure out the hotspot methods 
> where most time is spend? This would be very helpful as a starting point 
> where the problem is potentially caused.
> 
> Best,
> Stefan
> 
>> Am 29.06.2017 um 18:02 schrieb Jared Stehler 
>> > >:
>> 
>> We’re seeing our external checkpoints directory grow in an unbounded 
>> fashion… after upgrading to Flink 1.3.  We are using Flink-Mesos.
>> 
>> In 1.2 (HA standalone mode), we saw (correctly) that only the latest 
>> external checkpoint was being retained (i.e., respecting 
>> state.checkpoints.num-retained default of 1)
>> 
>> The Mesos-agent running the Job Manager ends up with a really high load and 
>> ends up getting unresponsive….  Interestingly enough, there is not much CPU 
>> or Memory pressure… so it is suggesting to us that its IO or Network bound.  
>> But nothing jumps out at us (using iostat/netstat).  The biggest difference 
>> seems to be external checkpoints not getting cleaned up/discarded.  What 
>> might cause that?
>> 
>> ubuntu@ip-10-80-52-176:/mnt/shared/flink/ext-checkpoints$ top
>> top - 13:47:41 up 16:31,  1 user,  load average: 25.85, 25.62, 25.43
>> Tasks: 297 total,   1 running, 296 sleeping,   0 stopped,   0 zombie
>> %Cpu(s):  0.3 us,  0.0 sy,  0.0 ni, 98.8 id,  0.7 wa,  0.0 hi,  0.0 si,  0.0 
>> st
>> KiB Mem:  32948204 total, 23974844 used,  8973360 free,   144572 buffers
>> KiB Swap:0 total,0 used,0 free.  7752480 cached Mem
>> 
>> We specify Mesos agent attributes to ensure that our Flink containers are 
>> allocated to only a subset of the Mesos slaves…   However, we do end up with 
>> the Flink JobManager container running on the same physical instance as 
>> multiple task manager containers. We are running 65 task managers with 2 
>> slots each, and ~70 jobs currently on the cluster.
>> 
>> We use AWS EFS (https://aws.amazon.com/efs/ ) 
>> mounted on all Mesos boxes to store recovery, checkpoint, external 
>> checkpoint and save point directories.
>> 
>> 
>> 
>> executionEnvironment.enableCheckpointing(TimeUnit.SECONDS.toMillis(30));
>> 
>> CheckpointConfig config = executionEnvironment.getCheckpointConfig();
>> 
>> config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> config.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(5));
>> 
>> executionEnvironment.getConfig().setGlobalJobParameters(params);
>> 
>> executionEnvironment.getConfig().setAutoWatermarkInterval(watermarkInterval.getValue());
>> 
>> executionEnvironment.getConfig().setCodeAnalysisMode(CodeAnalysisMode.HINT);
>> 
>> 
>> executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>> // fail the job if it restarts more than 3 times in 1 minute, with 
>> 10 second delay
>> 
>> executionEnvironment.setRestartStrategy(RestartStrategies.failureRateRestart(3,
>> Time.minutes(2), Time.seconds(1)));
>> 
>> executionEnvironment.getConfig().setLatencyTrackingInterval(3);
>> 
>> 
>> Would appreciate any insights you might have on this.
>> 
>> Thanks
>> 
>> --
>> Jared Stehler
>> Chief Architect - Intellify Learning
>> o: 617.701.6330 x703
>> 
>> 
>> 
> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Best practices for assigning operator UIDs

2017-07-09 Thread Jared Stehler
I have some confusion around how best to assign UIDs to operators. The 
documentation states simply that they are important, but stops short of 
recommending what if any stateful information should go into the name. For 
example, if the same code is used to create two separate job graphs, should the 
operator UIDs contain information specific to each job instance?

Example: for a kafka source, should the UID contain the subscribed topic name / 
pattern?

Example: if I have a custom mapping function which reshapes a JSON field based 
on an expression, should that expression (or a hash thereof) be contained in 
that operator’s UID?

Basically, is the UID similar to overriding hashcode and equals for POJOs, or 
is it more like overriding serializableVersionUID?


--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703





signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: problems starting the training exercise TaxiRideCleansing on local cluster

2017-07-09 Thread Günter Hipler

Thanks for response.

My classpath contains a version

 mvn dependency:build-classpath
[INFO] Scanning for projects...
[INFO]
[INFO] 


[INFO] Building Flink Quickstart Job 0.1
[INFO] 


[INFO]
[INFO] --- maven-dependency-plugin:2.8:build-classpath (default-cli) @ 
flink-java-project ---

[INFO] Dependencies classpath:


togram/2.1.6/HdrHistogram-2.1.6.jar:/home/swissbib/.m2/repository/com/twitter/jsr166e/1.1.0/jsr166e-1.1.0.jar:/home/swissbib/.m2/repository/joda-time/joda-time/2.9.9/joda-time-2.9.9.jar:



which contains definitely the required method. 
(http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormatter.html#withZoneUTC--)


Something else is going wrong. I guess the way how I started (or 
configured) the local cluster (but it's done as described in the 
training setup (http://training.data-artisans.com/devEnvSetup.html) - 
which is very straightforward.


Günter


On 09.07.2017 16:17, Ted Yu wrote:
Since the exception was about a missing method (withZoneUTC) instead 
of class not found, it was likely due to a conflicting joda time jar 
being on the classpath.


Cheers

On Sun, Jul 9, 2017 at 1:22 AM, Günter Hipler 
> wrote:


Hi,

sorry for this newbie question...

I'm following the data artisans exercises and wanted to run the
TaxiRide Cleansing job on my local cluster (version 1.3.1)
(http://training.data-artisans.com/exercises/rideCleansing.html
)

While this is possible within my IDE the cluster throws an
exception because of a missing type although the missed type is
part of the application jar the cluster is provided with.


swissbib@ub-sbhp02:~/environment/code/flink_einarbeitung/training/flink-java-project/target$
jar tf flink-java-project-0.1.jar | grep DateTimeFormatter
org/elasticsearch/common/joda/FormatDateTimeFormatter.class
org/joda/time/format/DateTimeFormatter.class
org/joda/time/format/DateTimeFormatterBuilder$CharacterLiteral.class
org/joda/time/format/DateTimeFormatterBuilder$Composite.class
org/joda/time/format/DateTimeFormatterBuilder$FixedNumber.class
org/joda/time/format/DateTimeFormatterBuilder$Fraction.class
org/joda/time/format/DateTimeFormatterBuilder$MatchingParser.class
org/joda/time/format/DateTimeFormatterBuilder$NumberFormatter.class
org/joda/time/format/DateTimeFormatterBuilder$PaddedNumber.class
org/joda/time/format/DateTimeFormatterBuilder$StringLiteral.class
org/joda/time/format/DateTimeFormatterBuilder$TextField.class
org/joda/time/format/DateTimeFormatterBuilder$TimeZoneId.class
org/joda/time/format/DateTimeFormatterBuilder$TimeZoneName.class
org/joda/time/format/DateTimeFormatterBuilder$TimeZoneOffset.class
org/joda/time/format/DateTimeFormatterBuilder$TwoDigitYear.class
org/joda/time/format/DateTimeFormatterBuilder$UnpaddedNumber.class
org/joda/time/format/DateTimeFormatterBuilder.class

Any advice? Thanks!

Günter


swissbib@ub-sbhp02:/usr/local/swissbib/flink$ bin/flink run -c
org.apache.flink.quickstart.St
reamingJob

/home/swissbib/environment/code/flink_einarbeitung/training/flink-java-project/target/flink-java-project-0.1.jar
--input

/home/swissbib/environment/code/flink_einarbeitung/training/flink-java-project/data/nycTaxiRides.gz

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in

[jar:file:/usr/local/swissbib/flink-1.3.1/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in

[jar:file:/home/swissbib/environment/tools/hbase-1.2.1/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in

[jar:file:/home/swissbib/environment/tools/hadoop-2.5.1/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings
 for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Cluster configuration: Standalone cluster with JobManager at
localhost/127.0.0.1:6123 
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8081
Starting execution of program
Submitting job with JobID: 32c7f2d0bbcac4d8c0367639ea928014.
Waiting for job completion.
Connected to JobManager at
Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1464375722]
with leader session id ----.
07/09/2017 09:31:51Job execution switched to status RUNNING.
07/09/2017 

Re: flink kafka consumer lag

2017-07-09 Thread Kien Truong

Hi,

You should setup a metric reporter to collect Flink's metrics.

https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html

There's a lot of useful information in the metrics, including the 
consumer lags.


I'm using the Graphite reporter with InfluxDB for storage + Grafana to 
display the metrics,


it's been working great so far.


Regards,

Kien


On 7/9/17 1:16 PM, Karthik Deivasigamani wrote:

Hi,
I'm using Flink 1.2.1 and FlinkKafkaConsumer09 to read data from
Kafka Server (0.10.1.0). My consumer is able to read and everything
works fine.
But when I use the kafka_consumer_groups.sh command to find the lag
metrics it does not return the lag. Upon reading online I found that
Flink Kafka Consumer uses the low level API and manages the assignment
of partition itself.
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-group-question-td8185.html

What is the best way to monitor my flink kafka consumer's lag? Does
Flink expose the lag metrics per partition somewhere?

~
Karthik




problems starting the training exercise TaxiRideCleansing on local cluster

2017-07-09 Thread Günter Hipler

Hi,

sorry for this newbie question...

I'm following the data artisans exercises and wanted to run the TaxiRide 
Cleansing job on my local cluster (version 1.3.1)

(http://training.data-artisans.com/exercises/rideCleansing.html)

While this is possible within my IDE the cluster throws an exception 
because of a missing type although the missed type is part of the 
application jar the cluster is provided with.


swissbib@ub-sbhp02:~/environment/code/flink_einarbeitung/training/flink-java-project/target$ 
jar tf flink-java-project-0.1.jar | grep DateTimeFormatter

org/elasticsearch/common/joda/FormatDateTimeFormatter.class
org/joda/time/format/DateTimeFormatter.class
org/joda/time/format/DateTimeFormatterBuilder$CharacterLiteral.class
org/joda/time/format/DateTimeFormatterBuilder$Composite.class
org/joda/time/format/DateTimeFormatterBuilder$FixedNumber.class
org/joda/time/format/DateTimeFormatterBuilder$Fraction.class
org/joda/time/format/DateTimeFormatterBuilder$MatchingParser.class
org/joda/time/format/DateTimeFormatterBuilder$NumberFormatter.class
org/joda/time/format/DateTimeFormatterBuilder$PaddedNumber.class
org/joda/time/format/DateTimeFormatterBuilder$StringLiteral.class
org/joda/time/format/DateTimeFormatterBuilder$TextField.class
org/joda/time/format/DateTimeFormatterBuilder$TimeZoneId.class
org/joda/time/format/DateTimeFormatterBuilder$TimeZoneName.class
org/joda/time/format/DateTimeFormatterBuilder$TimeZoneOffset.class
org/joda/time/format/DateTimeFormatterBuilder$TwoDigitYear.class
org/joda/time/format/DateTimeFormatterBuilder$UnpaddedNumber.class
org/joda/time/format/DateTimeFormatterBuilder.class

Any advice? Thanks!

Günter


swissbib@ub-sbhp02:/usr/local/swissbib/flink$ bin/flink run -c 
org.apache.flink.quickstart.StreamingJob 
/home/swissbib/environment/code/flink_einarbeitung/training/flink-java-project/target/flink-java-project-0.1.jar 
--input 
/home/swissbib/environment/code/flink_einarbeitung/training/flink-java-project/data/nycTaxiRides.gz 


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/usr/local/swissbib/flink-1.3.1/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/home/swissbib/environment/tools/hbase-1.2.1/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/home/swissbib/environment/tools/hadoop-2.5.1/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Cluster configuration: Standalone cluster with JobManager at 
localhost/127.0.0.1:6123

Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8081
Starting execution of program
Submitting job with JobID: 32c7f2d0bbcac4d8c0367639ea928014. Waiting for 
job completion.
Connected to JobManager at 
Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1464375722] with 
leader session id ----.

07/09/2017 09:31:51Job execution switched to status RUNNING.
07/09/2017 09:31:51Source: Custom Source -> Filter -> Sink: 
Unnamed(1/1) switched to SCHEDULED
07/09/2017 09:31:51Source: Custom Source -> Filter -> Sink: 
Unnamed(1/1) switched to DEPLOYING
07/09/2017 09:31:51Source: Custom Source -> Filter -> Sink: 
Unnamed(1/1) switched to RUNNING
07/09/2017 09:31:51Source: Custom Source -> Filter -> Sink: 
Unnamed(1/1) switched to FAILED
java.lang.NoSuchMethodError: 
org.joda.time.format.DateTimeFormatter.withZoneUTC()Lorg/joda/time/format/DateTimeFormatter;
at 
com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide.(TaxiRide.java:43)
at 
com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource.generateUnorderedStream(TaxiRideSource.java:142)
at 
com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource.run(TaxiRideSource.java:113)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)

07/09/2017 09:31:51Job execution switched to status FAILING.
java.lang.NoSuchMethodError: 
org.joda.time.format.DateTimeFormatter.withZoneUTC()Lorg/joda/time/format/DateTimeFormatter;
at 
com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide.(TaxiRide.java:43)
at 
com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource.generateUnorderedStream(TaxiRideSource.java:142)
at 

flink kafka consumer lag

2017-07-09 Thread Karthik Deivasigamani
Hi,
   I'm using Flink 1.2.1 and FlinkKafkaConsumer09 to read data from
Kafka Server (0.10.1.0). My consumer is able to read and everything
works fine.
But when I use the kafka_consumer_groups.sh command to find the lag
metrics it does not return the lag. Upon reading online I found that
Flink Kafka Consumer uses the low level API and manages the assignment
of partition itself.
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-group-question-td8185.html

What is the best way to monitor my flink kafka consumer's lag? Does
Flink expose the lag metrics per partition somewhere?

~
Karthik