Re: add FLINK_LIB_DIR to classpath on yarn -> add properties file to class path on yarn

2016-10-24 Thread vinay patil
Hi Max,

As discussed here , I have put my yaml file in the flink lib directory, but
still I am not able to get this file from classpath.

I am using Flink 1.1.1 and cfg4j to load the file from classpath.

Running the job on YARN in EMR using the below command:

./bin/flink run 

Can you please let me know what I am doing wrong here.

Regards,
Vinay Patil



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/add-FLINK-LIB-DIR-to-classpath-on-yarn-tp8097p9702.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: About stateful transformations

2016-10-24 Thread Juan Rodríguez Hortalá
Hi Gyula,

Thanks a lot for your response, it was very clear. I understand that there
is no problem of small files due to checkpointing not being incremental. I
also understand that each worker will interpret a file:// URL as local to
its own file system, which works ok if all workers have a remove file
system mounted in the same local path.

Now I have to check if Flink provides some expiration mechanism for old
checkpoints, although for S3 that is already available, and for HDFS I
guess some script that periodically deletes older files with hdfs dfs
-rmr should
be easy enough. Is there any documentation about some naming convention for
checkpoint files that I could rely to delete old checkpoints? E.g. some
naming schema that uses dates, it would be nicer if it was documented
because then it would be more stable.

Thanks again for your help.

Greetings,

Juan


On Mon, Oct 24, 2016 at 12:29 AM, Gyula Fóra  wrote:

> Hi Juan,
>
> Let me try to answer some of your questions :)
>
> We have been running Flink Streaming at King for quite some time now with
> multiple jobs having several hundred gigabytes of KV state stored in
> RocksDB. I would say RocksDB state backend is definitely the best choice at
> the moment for large deployments as you can also keep the heap relatively
> small to save some time on GC. But you have to play around with the rocks
> configuration to get the most out of it depending on your hardware.
>
> I am not aware of any caching/TTL functionality exposed in the Flink APIs
> currently. But if you are willing to dig a llittle deeper you could
> implement a lower lever operator that uses timers like the windowing
> mechanisms to clear state after some time.
>
> When you are selecting a checkpoint directory (URL) you need to make sure
> that it is accessible from all the task managers. HDFS is convenient but is
> not strictly necessary. We for instance use CEPH that is mounted as a
> regular disk from the OS's perspective so we can use file:// and still save
> to the distributed storage. As far as I know using yarn doesnt give much
> benefit, I am not sure if Flink exploits any data locality at this moment.
>
> When you are running rocks db state backend there are two concepts you
> need to think about for checkpointing. Your local rocks db directory, and
> the checkpoint directory. The local directory is where the rocks instances
> are created and they live on the taskmanagers local disk/memory. When Flink
> takes a checkpoint a backup of all K-V pairs is copied as one blob to HDFS
> or to the selected checkpoint directory. This means there is no data
> fragmentation in the checkpoints. Similar applies to the FsStateBackend but
> that keeps the local state strictly in memory.
>
> I think you should definitely give RocksDB + HDFS a try. It works
> extremely well for very large state sizes given some tuning, but should
> also perform out-of-the-box :)
>
> Cheers,
> Gyula
>
> Juan Rodríguez Hortalá  ezt írta
> (időpont: 2016. okt. 23., V, 22:29):
>
>> Hi all,
>>
>> I don't have much experience with Flink, so please forget me if I ask
>> some obvious questions. I was taking a look to the documentation
>> on stateful transformations in Flink at https://ci.apache.org/
>> projects/flink/flink-docs-release-1.2/dev/state.html. I'm mostly
>> interested in Flink for stream processing, and I would like to know:
>>
>> - What is the biggest state that has been used in production deployments?
>> I'm interested in how many GB of state, among all key-value pairs, have
>> been used before in long running streaming jobs deployed in production.
>> Maybe some company has shared this in some conference or blog post. I guess
>> for that RocksDB backend is the best option for big states, to avoid being
>> limited by the available memory.
>>
>> - Is there any pre built functionality for state eviction? I'm thinking
>> of LRU cache-like behavior, with eviction based on time or size, similar to
>> Guava cache (https://github.com/google/guava/wiki/CachesExplained). This
>> is probably easy to implement anyway, by using the clear() primitive, but I
>> wouldn't like to reinvent the wheel if this is already implemented
>> somewhere.
>>
>> - When using file:// for the checkpointing URL, is the data replicated in
>> the workers, or a failure in a worker leads to losing the state stored in
>> that worker? I guess with hdfs:// we get the replication of HDFS, and we
>> don't have that problem. I also guess S3 can be used for checkpointing
>> state too, is there any remarkable performance impact of using s3 instead
>> of HDFS for checkpointing? I guess some performance is lost compared to a
>> setup running in YARN with collocated DataNodes and NodeManagers, but I
>> wanted to know if the impact is negible, as checkpointing is performed at a
>> relatively slow frequency. Also I'm interested on Flink running on EMR,
>> where the impact of this should be even smaller because the access to S3 is
>> faster from EMR than from an 

Elasticsearch Http Connector

2016-10-24 Thread Madhukar Thota
Friends

Any one using new Elasticsearch RestClient(
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/index.html)
with Flink?


Re: Trigger evaluate

2016-10-24 Thread Fabian Hueske
No, this is correct. It describes how a Trigger is called, i.e., Flink
calls for each element that is inserted into a window the
Trigger.onElement() method.
The default trigger of a TimeWindow does not fire on new elements. However,
a custom trigger might fire when onElement() is called.

2016-10-24 21:37 GMT+02:00 Alberto Ramón :

>
>
> 
> this is a mistake ?
> 
>
>
>
> *"The trigger is called for each element that is inserted into the window
> and when a previously registered timer times out"*
> Thanks !!
>
> 2016-10-24 20:45 GMT+02:00 Fabian Hueske :
>
>> The window is evaluated when a watermark arrives that is behind the
>> window's end time.
>>
>> For instance, give the window in your example there are windows that end
>> at 1:00:00, 1:00:30, 1:01:00, 1:01:30, ... (every 30 seconds).
>> given the windows above, the window from 00:59:00 to 1:00:00 will be
>> evaluated, when a watermark of 1:00:00 or later is received. It might also
>> happen that multiple windows are evaluated if watermarks are more than 30
>> seconds apart.
>>
>> Best, Fabian
>>
>> 2016-10-24 20:38 GMT+02:00 Alberto Ramón :
>>
>>> I mean about *default Trigge*r, when you only put this:
>>>
>>>  .timeWindow(Time.minutes(1), Time.seconds(30))
>>>   .sum(1)
>>>
>>> When data window is evaluated ?
>>>
>>> this 
>>> 
>>>  is related?
>>>
>>>
>>>
>>> 2016-10-24 19:39 GMT+02:00 Aljoscha Krettek :
>>>
 Hi,
 this depends on the Trigger you're using. For example, EventTimeTrigger
 will trigger when the watermark passes the end of a window.

 Cheers,
 Aljoscha

 On Mon, 24 Oct 2016 at 17:10 Alberto Ramón 
 wrote:

> Hello, 1 doubt:
>
> By default, when Trigger is launch to evaluate data of window ?
>  - New element in window?
>  - When a watermark arrive?
>  - When the window is moved?
>
> Thanks , Alb
>

>>>
>>
>


Re: Trigger evaluate

2016-10-24 Thread Alberto Ramón

this is a mistake ?




*"The trigger is called for each element that is inserted into the window
and when a previously registered timer times out"*
Thanks !!

2016-10-24 20:45 GMT+02:00 Fabian Hueske :

> The window is evaluated when a watermark arrives that is behind the
> window's end time.
>
> For instance, give the window in your example there are windows that end
> at 1:00:00, 1:00:30, 1:01:00, 1:01:30, ... (every 30 seconds).
> given the windows above, the window from 00:59:00 to 1:00:00 will be
> evaluated, when a watermark of 1:00:00 or later is received. It might also
> happen that multiple windows are evaluated if watermarks are more than 30
> seconds apart.
>
> Best, Fabian
>
> 2016-10-24 20:38 GMT+02:00 Alberto Ramón :
>
>> I mean about *default Trigge*r, when you only put this:
>>
>>  .timeWindow(Time.minutes(1), Time.seconds(30))
>>   .sum(1)
>>
>> When data window is evaluated ?
>>
>> this 
>> 
>>  is related?
>>
>>
>>
>> 2016-10-24 19:39 GMT+02:00 Aljoscha Krettek :
>>
>>> Hi,
>>> this depends on the Trigger you're using. For example, EventTimeTrigger
>>> will trigger when the watermark passes the end of a window.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Mon, 24 Oct 2016 at 17:10 Alberto Ramón 
>>> wrote:
>>>
 Hello, 1 doubt:

 By default, when Trigger is launch to evaluate data of window ?
  - New element in window?
  - When a watermark arrive?
  - When the window is moved?

 Thanks , Alb

>>>
>>
>


Re: Trigger evaluate

2016-10-24 Thread Fabian Hueske
The window is evaluated when a watermark arrives that is behind the
window's end time.

For instance, give the window in your example there are windows that end at
1:00:00, 1:00:30, 1:01:00, 1:01:30, ... (every 30 seconds).
given the windows above, the window from 00:59:00 to 1:00:00 will be
evaluated, when a watermark of 1:00:00 or later is received. It might also
happen that multiple windows are evaluated if watermarks are more than 30
seconds apart.

Best, Fabian

2016-10-24 20:38 GMT+02:00 Alberto Ramón :

> I mean about *default Trigge*r, when you only put this:
>
>  .timeWindow(Time.minutes(1), Time.seconds(30))
>   .sum(1)
>
> When data window is evaluated ?
>
> this 
> 
>  is related?
>
>
>
> 2016-10-24 19:39 GMT+02:00 Aljoscha Krettek :
>
>> Hi,
>> this depends on the Trigger you're using. For example, EventTimeTrigger
>> will trigger when the watermark passes the end of a window.
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 24 Oct 2016 at 17:10 Alberto Ramón 
>> wrote:
>>
>>> Hello, 1 doubt:
>>>
>>> By default, when Trigger is launch to evaluate data of window ?
>>>  - New element in window?
>>>  - When a watermark arrive?
>>>  - When the window is moved?
>>>
>>> Thanks , Alb
>>>
>>
>


Re: Trigger evaluate

2016-10-24 Thread Alberto Ramón
I mean about *default Trigge*r, when you only put this:

 .timeWindow(Time.minutes(1), Time.seconds(30))
  .sum(1)

When data window is evaluated ?

this 

is related?



2016-10-24 19:39 GMT+02:00 Aljoscha Krettek :

> Hi,
> this depends on the Trigger you're using. For example, EventTimeTrigger
> will trigger when the watermark passes the end of a window.
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 17:10 Alberto Ramón 
> wrote:
>
>> Hello, 1 doubt:
>>
>> By default, when Trigger is launch to evaluate data of window ?
>>  - New element in window?
>>  - When a watermark arrive?
>>  - When the window is moved?
>>
>> Thanks , Alb
>>
>


Re: Event time, watermarks and windows

2016-10-24 Thread Aljoscha Krettek
Hi,
the call to setAutoWatermarkInterval() is still necessary to activate the
watermark-generation mechanism. However, calling
setStreamTimeCharacteristic(EventTime) will also set a good default value
for the auto watermark interval.

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 17:02 Paul Joireman  wrote:

> Hi all,
>
> The  event timestamp and watermarks documentation (v. 1.1)
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_timestamps_watermarks.html
>  states
> that
>
>  The AssignerWithPeriodicWatermarks assigns timestamps and generates
> watermarks periodically (possibly depending on the stream elements, or
> purely
>   based on processing time).
>
>  The interval (every n milliseconds) in which the watermark will be
> generated is defined via ExecutionConfig.setAutoWatermarkInterval(...).
> Each time, the
>  assigner’s getCurrentWatermark() method will be called, and a new
> Watermark will be emitted, if the returned Watermark is non-null and larger
> than the
>   previous Watermark.
>
> It goes on to give an example of the
> BoundedOutOfOrdernessTimestampExtractor.   My question is that since the 
> BoundedOutOfOrdernessTimestampExtractor
> is a sub-class of the AssignerWithPeriodicWatermarks is it necessary to
> call ExecutionConfig.setAutoWatermarkInterval(...) on the environment in
> order to actually generate watermarks or should they be generated
> "automatically" by the subsequent operators.
>
> In other words, will an event time window still fire if this call
> (setAutoWatermarkInterval) is not present?
>
> Regards,
> Paul W Joireman
>


Re: Trigger evaluate

2016-10-24 Thread Aljoscha Krettek
Hi,
this depends on the Trigger you're using. For example, EventTimeTrigger
will trigger when the watermark passes the end of a window.

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 17:10 Alberto Ramón 
wrote:

> Hello, 1 doubt:
>
> By default, when Trigger is launch to evaluate data of window ?
>  - New element in window?
>  - When a watermark arrive?
>  - When the window is moved?
>
> Thanks , Alb
>


Re: watermark trigger doesn't check whether element's timestamp is passed

2016-10-24 Thread Aljoscha Krettek
Hi,
with some additional information we might be able to figure this out
together. What specific combination of WindowAssigner/Trigger are you using
for your example and what is the input stream (including watermarks)?

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 06:30 Manu Zhang  wrote:

> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>


Re: FlinkKafkaConsumerBase - Received confirmation for unknown checkpoint

2016-10-24 Thread Aljoscha Krettek
@Robert, do you have any idea what might be going on here?

On Fri, 21 Oct 2016 at 16:50 PedroMrChaves 
wrote:

> Hello,
>
> Am getting the following warning upon executing a checkpoint
>
> /2016-10-21 16:31:54,229 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 5 @ 1477063914229
> 2016-10-21 16:31:54,233 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 5 (in 3 ms)
> 2016-10-21 16:31:54,234 WARN
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
> Received confirmation for unknown checkpoint id 5/
>
> This is the code I have to setup the environment and the kafka consumer:
>
>  / /**
>  * Flink execution environment configuration
>  */
> private void setupEnvironmnet() {
> environment = StreamExecutionEnvironment.getExecutionEnvironment();
> environment.enableCheckpointing(CHECKPOINTING_INTERVAL);
> tableEnvironment =
> TableEnvironment.getTableEnvironment(environment);
>
> }
>
> /**
>  * Kafka Consumer configuration
>  */
> private void kafkaConsumer(String server, String topic) {
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", server);
> properties.setProperty("group.id", "Demo");
> stream = environment.addSource(new FlinkKafkaConsumer09<>(topic,
> new
> SimpleStringSchema(), properties))
> .map(new Parser());
> }/
>
>
> Any idea what the problem might be?
>
> Thank you and regards,
> Pedro Chaves
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumerBase-Received-confirmation-for-unknown-checkpoint-tp9674.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: question for generic streams

2016-10-24 Thread Aljoscha Krettek
Hi,
I think the basic problem here is that the generic types cannot really be
changed when executing, i.e. when you read the types of the stream from a
file.

One thing I could suggest is to use Strings for everything and inside those
Strings use JSON or something similar to encode different types. This might
not be a very well performing solution but it is very general.

Cheers,
Aljoscha

On Thu, 20 Oct 2016 at 18:15 Radu Tudoran  wrote:

> Hi,
>
>
>
> I am trying to read the types of an input stream from a file and then
> generate the datastream resulting from parsing a source accordingly (e.g.
> DataStream>). Finally this will be registered as a
> table.
>
> What would be the best way to do this?
>
>
>
> I am trying currently to use the generic type of Tuple for the datastream
> which then will be instantiated based on the arity of the number of types.
> However, this does not seem to be a good way (and did not really figure out
> how to actually implement it)
>
>
>
> Any suggestions are welcome
>


Checkpointing large RocksDB state to S3 - tips?

2016-10-24 Thread Josh
Hi all,

I'm running Flink on EMR/YARN with 2x m3.xlarge instances and am
checkpointing a fairly large RocksDB state to S3.

I've found that when the state size hits 10GB, the checkpoint takes around
6 minutes, according to the Flink dashboard. Originally my checkpoint
interval was 5 minutes for the job, but I've found that the YARN container
crashes (I guess because the checkpoint time is greater than the checkpoint
interval), so have now decreased the checkpoint frequency to every 10
minutes.

I was just wondering if anyone has any tips about how to reduce the
checkpoint time. Taking 6 minutes to checkpoint ~10GB state means it's
uploading at ~30MB/sec. I believe the m3.xlarge instances should have
around 125MB/sec network bandwidth each, so I think the bottleneck is S3.
Since there are 2 instances, I'm not sure if that means each instance is
uploading at 15MB/sec - do the state uploads get shared equally among the
instances, assuming the state is split equally between the task managers?

If the state upload is split between the instances, perhaps the only way to
speed up the checkpoints is to add more instances and task managers, and
split the state equally among the task managers?

Also just wondering - is there any chance the incremental checkpoints work
will be complete any time soon?

Thanks,
Josh


missing data in window.reduce() while apply() is ok

2016-10-24 Thread Sendoh
Hi Flink users,

I saw a strange behavior that data are missing in reduce() but apply()
doesn't, and when using 1.0.3 we don't see this behavior, and we see this in
1.1.3. Losing data means we don't see any event in the keys assigned, not
the count of events.

The code is as follows.

DataStream> streams = env.addSource(new
FlinkKafkaConsumer09<>(topicList, new SimpleStringSchema(), properties))
.name("kafka_topics")
.rebalance()
.flatMap(new Eventsmap(events))
.assignTimestampsAndWatermarks(new EventWatermark());

DataStream> count = streams
.keyBy(new
CompoundJsonKeySelector()).timeWindow(Time.minutes(1))
.allowedLateness(Time.minutes(3))
//   apply is ok
//.apply(new WindowFunction,
Map, String, TimeWindow>() {
//   @Override
//   public void apply(String s, TimeWindow
timeWindow, Iterable> iterable,
Collector> collector) throws Exception {
//   Iterator> it =
iterable.iterator();
//   collector.collect(it.next());
//   }
//   }
//);
//   reduce() loses data
.reduce(new ReduceFunction>() {
@Override
public Map reduce(Map
v1, Map v2) throws Exception {
int newCount =
Integer.parseInt(v1.get("count").toString()) +
Integer.parseInt(v2.get("count").toString());
v2.put("count",newCount);
return v2;
}
});

Best,

Is there any suggestion that we can try to 
figure out the root cause?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/missing-data-in-window-reduce-while-apply-is-ok-tp9689.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Trigger evaluate

2016-10-24 Thread Alberto Ramón
Hello, 1 doubt:

By default, when Trigger is launch to evaluate data of window ?
 - New element in window?
 - When a watermark arrive?
 - When the window is moved?

Thanks , Alb


Event time, watermarks and windows

2016-10-24 Thread Paul Joireman
Hi all,

The  event timestamp and watermarks documentation (v. 1.1)  
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_timestamps_watermarks.html
 states that

 The AssignerWithPeriodicWatermarks assigns timestamps and generates 
watermarks periodically (possibly depending on the stream elements, or purely
  based on processing time).

 The interval (every n milliseconds) in which the watermark will be 
generated is defined via ExecutionConfig.setAutoWatermarkInterval(...). Each 
time, the
 assigner's getCurrentWatermark() method will be called, and a new 
Watermark will be emitted, if the returned Watermark is non-null and larger 
than the
  previous Watermark.

It goes on to give an example of the BoundedOutOfOrdernessTimestampExtractor.   
My question is that since the BoundedOutOfOrdernessTimestampExtractor is a 
sub-class of the AssignerWithPeriodicWatermarks is it necessary to call 
ExecutionConfig.setAutoWatermarkInterval(...) on the environment in order to 
actually generate watermarks or should they be generated "automatically" by the 
subsequent operators.

In other words, will an event time window still fire if this call 
(setAutoWatermarkInterval) is not present?

Regards,
Paul W Joireman


Watermarks and window firing

2016-10-24 Thread Paul Joireman
Hi all,

The documentation for event timestamps and watermarks 
(https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_timestamps_watermarks.html)
 states that the

The AssignerWithPeriodicWatermarks assigns timestamps and generates watermarks 
periodically (possibly depending on the stream elements, or purely based on 
processing time).

The interval (every n milliseconds) in which the watermark will be generated is 
defined viaExecutionConfig.setAutoWatermarkInterval(...). Each time, the 
assigner's getCurrentWatermark() method will be called, and a new Watermark 
will be emitted, if the returned Watermark is non-null and larger than the 
previous Watermark.




Re: multiple processing of streams

2016-10-24 Thread Fabian Hueske
Hi Robert,

Unfortunately, FoldFunctions can only be used for eager aggregation in
Tumbling and SlidingWindows.
For SessionWindows, only ReduceFunction can be used. The problem is that
two SessionWindows might be combined in case a late event is received that
"connects" them. In that case, the window function would need to combine
two intermediate results of the same type. This is not possible with
FoldFunction where input type and result type may be different.

Best, Fabian

2016-10-24 15:54 GMT+02:00 :

> Hi Fabian,
>
>
>
> Ah, that is good stuff, thanks.  I’ll have to evaluate, but I believe that
> everything I’m calculating can be done this way, though it looks like
> FoldFunction + WindowFunction is better suited for my use case.
>
>
>
> I may still need a custom trigger as well, though.  Some sessions may
> never end, or may be too long lasting for the information to be timely by
> the time the window closes.  So I might still need to prematurely end those
> sessions in order to get data about them (or rely on information gathered
> from my tumbling windows).  But, I’ll start with Fold + Window first, which
> should get rid of my heap issues.
>
>
>
> Thanks!
>
>
>
> *From: *Fabian Hueske 
> *Reply-To: *"user@flink.apache.org" 
> *Date: *Monday, October 24, 2016 at 2:27 PM
>
> *To: *"user@flink.apache.org" 
> *Subject: *Re: multiple processing of streams
>
>
>
> Hi Robert,
>
> thanks for the update.
>
> Regarding the SessionWindow. If you can implement your window logic as
> ReduceFunction + WindowFunction (see incremental window aggregation [1]),
> your window state will be independent of the number of elements in the
> window. If that is not possible, you might have to go with the custom
> trigger approach you described.
>
> Best, Fabian
>
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/streaming/windows.html#windowfunction-
> with-incremental-aggregation
>
>
>
> 2016-10-24 13:59 GMT+02:00 :
>
> Hi Fabian,
>
>
>
> Thanks for the response.  It turns out this was a red herring.  I knew how
> many events I was sending through the process, and the output of each type
> of window aggregate was coming out to be about half of what I expected.  It
> turns out, however, that I hadn’t realized that the job was failing prior
> to completing processing (out of heap), so not all records were processed.
> I believe my out of heap issue to be caused by sessions with a very large
> number of records per key (and/or with no period of inactivity to trigger
> the end of the session), so I’m looking at a way to customize the
> EventTimeSessionWindow, or perhaps create a custom EventTrigger, to force a
> session to close after either X seconds of inactivity or Y seconds of
> duration (or perhaps after Z events).
>
>
>
>
>
>
>
> *From: *Fabian Hueske 
> *Reply-To: *"user@flink.apache.org" 
> *Date: *Friday, October 21, 2016 at 5:17 PM
> *To: *"user@flink.apache.org" 
> *Subject: *Re: multiple processing of streams
>
>
>
> Hi Robert,
>
> it is certainly possible to feed the same DataStream into two (or more)
> operators.
>
> Both operators should then process the complete input stream.
>
> What you describe is an unintended behavior.
> Can you explain how you figure out that both window operators only receive
> half of the events?
>
> Thanks,
>
> Fabian
>
>
>
>
>
> 2016-10-19 18:28 GMT+02:00 :
>
> Is it possible to process the same stream in two different ways?  I can’t
> find anything in the documentation definitively stating this is possible,
> but nor do I find anything stating it isn’t.  My attempt had some
> unexpected results, which I’ll explain below:
>
>
>
> Essentially, I have a stream of data I’m pulling from Kafka.  I want to
> build aggregate metrics on this data set using both tumbling windows as
> well as session windows.  So, I do something like the following:
>
>
>
> DataStream baseStream =
>
> env.addSource(….);// pulling data from kafka
>
>.map(…) // parse the raw input
>
>   .assignTimestampsAndWatermarks(…);
>
>
>
> DataStream > timeWindowedStream =
>
> baseStream.keyBy(…)
>
>   .timeWindow(…)   //
> tumbling window
>
>   .apply(…);   //
> aggregation over tumbling window
>
>
>
> DataStream > sessionWindowedStream =
>
> baseStream.keyBy(…)
>
>   
> .window(EventTimeSessionWindows.withGap(…))
> // session window
>
>   .apply(…);
>//
> aggregation over session window
>
>
>
> The issue is that when I view my job in the Flink dashboard, it indicates
> that each type of windowing is only receiving half of the records.  Is what
> I’m trying simply unsupported or is there something I’m missing?
>
>
>
> Thanks!
>
>
>
>
>
>
>

Re: multiple processing of streams

2016-10-24 Thread robert.lancaster
Hi Fabian,

Ah, that is good stuff, thanks.  I’ll have to evaluate, but I believe that 
everything I’m calculating can be done this way, though it looks like 
FoldFunction + WindowFunction is better suited for my use case.

I may still need a custom trigger as well, though.  Some sessions may never 
end, or may be too long lasting for the information to be timely by the time 
the window closes.  So I might still need to prematurely end those sessions in 
order to get data about them (or rely on information gathered from my tumbling 
windows).  But, I’ll start with Fold + Window first, which should get rid of my 
heap issues.

Thanks!

From: Fabian Hueske 
Reply-To: "user@flink.apache.org" 
Date: Monday, October 24, 2016 at 2:27 PM
To: "user@flink.apache.org" 
Subject: Re: multiple processing of streams

Hi Robert,
thanks for the update.
Regarding the SessionWindow. If you can implement your window logic as 
ReduceFunction + WindowFunction (see incremental window aggregation [1]), your 
window state will be independent of the number of elements in the window. If 
that is not possible, you might have to go with the custom trigger approach you 
described.
Best, Fabian

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/windows.html#windowfunction-with-incremental-aggregation

2016-10-24 13:59 GMT+02:00 
mailto:robert.lancas...@hyatt.com>>:
Hi Fabian,

Thanks for the response.  It turns out this was a red herring.  I knew how many 
events I was sending through the process, and the output of each type of window 
aggregate was coming out to be about half of what I expected.  It turns out, 
however, that I hadn’t realized that the job was failing prior to completing 
processing (out of heap), so not all records were processed.  I believe my out 
of heap issue to be caused by sessions with a very large number of records per 
key (and/or with no period of inactivity to trigger the end of the session), so 
I’m looking at a way to customize the EventTimeSessionWindow, or perhaps create 
a custom EventTrigger, to force a session to close after either X seconds of 
inactivity or Y seconds of duration (or perhaps after Z events).



From: Fabian Hueske mailto:fhue...@gmail.com>>
Reply-To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Date: Friday, October 21, 2016 at 5:17 PM
To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: multiple processing of streams

Hi Robert,
it is certainly possible to feed the same DataStream into two (or more) 
operators.
Both operators should then process the complete input stream.

What you describe is an unintended behavior.
Can you explain how you figure out that both window operators only receive half 
of the events?
Thanks,
Fabian


2016-10-19 18:28 GMT+02:00 
mailto:robert.lancas...@hyatt.com>>:
Is it possible to process the same stream in two different ways?  I can’t find 
anything in the documentation definitively stating this is possible, but nor do 
I find anything stating it isn’t.  My attempt had some unexpected results, 
which I’ll explain below:

Essentially, I have a stream of data I’m pulling from Kafka.  I want to build 
aggregate metrics on this data set using both tumbling windows as well as 
session windows.  So, I do something like the following:

DataStream baseStream =
env.addSource(….);// pulling data from kafka
   .map(…) // parse the raw input
  .assignTimestampsAndWatermarks(…);

DataStream > timeWindowedStream =
baseStream.keyBy(…)
  .timeWindow(…)   // tumbling 
window
  .apply(…);   // 
aggregation over tumbling window

DataStream > sessionWindowedStream =
baseStream.keyBy(…)
  
.window(EventTimeSessionWindows.withGap(…))  // session window
  .apply(…);
   // aggregation over 
session window

The issue is that when I view my job in the Flink dashboard, it indicates that 
each type of windowing is only receiving half of the records.  Is what I’m 
trying simply unsupported or is there something I’m missing?

Thanks!









The information contained in this communication is confidential and intended 
only for the use of the recipient named above, and may be legally privileged 
and exempt from disclosure under applicable law. If the reader of this message 
is not the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. If you 
have received this communication in error, please resend it to the sender and 
delete the original message and copy of it from your comp

Re: multiple processing of streams

2016-10-24 Thread Fabian Hueske
Hi Robert,

thanks for the update.
Regarding the SessionWindow. If you can implement your window logic as
ReduceFunction + WindowFunction (see incremental window aggregation [1]),
your window state will be independent of the number of elements in the
window. If that is not possible, you might have to go with the custom
trigger approach you described.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/windows.html#windowfunction-with-incremental-aggregation

2016-10-24 13:59 GMT+02:00 :

> Hi Fabian,
>
>
>
> Thanks for the response.  It turns out this was a red herring.  I knew how
> many events I was sending through the process, and the output of each type
> of window aggregate was coming out to be about half of what I expected.  It
> turns out, however, that I hadn’t realized that the job was failing prior
> to completing processing (out of heap), so not all records were processed.
> I believe my out of heap issue to be caused by sessions with a very large
> number of records per key (and/or with no period of inactivity to trigger
> the end of the session), so I’m looking at a way to customize the
> EventTimeSessionWindow, or perhaps create a custom EventTrigger, to force a
> session to close after either X seconds of inactivity or Y seconds of
> duration (or perhaps after Z events).
>
>
>
>
>
>
>
> *From: *Fabian Hueske 
> *Reply-To: *"user@flink.apache.org" 
> *Date: *Friday, October 21, 2016 at 5:17 PM
> *To: *"user@flink.apache.org" 
> *Subject: *Re: multiple processing of streams
>
>
>
> Hi Robert,
>
> it is certainly possible to feed the same DataStream into two (or more)
> operators.
>
> Both operators should then process the complete input stream.
>
> What you describe is an unintended behavior.
> Can you explain how you figure out that both window operators only receive
> half of the events?
>
> Thanks,
>
> Fabian
>
>
>
>
>
> 2016-10-19 18:28 GMT+02:00 :
>
> Is it possible to process the same stream in two different ways?  I can’t
> find anything in the documentation definitively stating this is possible,
> but nor do I find anything stating it isn’t.  My attempt had some
> unexpected results, which I’ll explain below:
>
>
>
> Essentially, I have a stream of data I’m pulling from Kafka.  I want to
> build aggregate metrics on this data set using both tumbling windows as
> well as session windows.  So, I do something like the following:
>
>
>
> DataStream baseStream =
>
> env.addSource(….);// pulling data from kafka
>
>.map(…) // parse the raw input
>
>   .assignTimestampsAndWatermarks(…);
>
>
>
> DataStream > timeWindowedStream =
>
> baseStream.keyBy(…)
>
>   .timeWindow(…)   //
> tumbling window
>
>   .apply(…);   //
> aggregation over tumbling window
>
>
>
> DataStream > sessionWindowedStream =
>
> baseStream.keyBy(…)
>
>   
> .window(EventTimeSessionWindows.withGap(…))
> // session window
>
>   .apply(…);
>//
> aggregation over session window
>
>
>
> The issue is that when I view my job in the Flink dashboard, it indicates
> that each type of windowing is only receiving half of the records.  Is what
> I’m trying simply unsupported or is there something I’m missing?
>
>
>
> Thanks!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> The information contained in this communication is confidential and
> intended only for the use of the recipient named above, and may be legally
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any dissemination, distribution or copying of this communication is
> strictly prohibited. If you have received this communication in error,
> please resend it to the sender and delete the original message and copy of
> it from your computer system. Opinions, conclusions and other information
> in this message that do not relate to our official business should be
> understood as neither given nor endorsed by the company.
>
>
>
> --
> The information contained in this communication is confidential and
> intended only for the use of the recipient named above, and may be legally
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any dissemination, distribution or copying of this communication is
> strictly prohibited. If you have received this communication in error,
> please resend it to the sender and delete the original message and copy of
> it from your computer system. Opinions, conclusions and other information
> in this 

Re: multiple processing of streams

2016-10-24 Thread robert.lancaster
Hi Fabian,

Thanks for the response.  It turns out this was a red herring.  I knew how many 
events I was sending through the process, and the output of each type of window 
aggregate was coming out to be about half of what I expected.  It turns out, 
however, that I hadn’t realized that the job was failing prior to completing 
processing (out of heap), so not all records were processed.  I believe my out 
of heap issue to be caused by sessions with a very large number of records per 
key (and/or with no period of inactivity to trigger the end of the session), so 
I’m looking at a way to customize the EventTimeSessionWindow, or perhaps create 
a custom EventTrigger, to force a session to close after either X seconds of 
inactivity or Y seconds of duration (or perhaps after Z events).



From: Fabian Hueske 
Reply-To: "user@flink.apache.org" 
Date: Friday, October 21, 2016 at 5:17 PM
To: "user@flink.apache.org" 
Subject: Re: multiple processing of streams

Hi Robert,
it is certainly possible to feed the same DataStream into two (or more) 
operators.
Both operators should then process the complete input stream.

What you describe is an unintended behavior.
Can you explain how you figure out that both window operators only receive half 
of the events?
Thanks,
Fabian



2016-10-19 18:28 GMT+02:00 
mailto:robert.lancas...@hyatt.com>>:
Is it possible to process the same stream in two different ways?  I can’t find 
anything in the documentation definitively stating this is possible, but nor do 
I find anything stating it isn’t.  My attempt had some unexpected results, 
which I’ll explain below:

Essentially, I have a stream of data I’m pulling from Kafka.  I want to build 
aggregate metrics on this data set using both tumbling windows as well as 
session windows.  So, I do something like the following:

DataStream baseStream =
env.addSource(….);// pulling data from kafka
   .map(…) // parse the raw input
  .assignTimestampsAndWatermarks(…);

DataStream > timeWindowedStream =
baseStream.keyBy(…)
  .timeWindow(…)   // tumbling 
window
  .apply(…);   // 
aggregation over tumbling window

DataStream > sessionWindowedStream =
baseStream.keyBy(…)
  
.window(EventTimeSessionWindows.withGap(…))  // session window
  .apply(…);
   // aggregation over 
session window

The issue is that when I view my job in the Flink dashboard, it indicates that 
each type of windowing is only receiving half of the records.  Is what I’m 
trying simply unsupported or is there something I’m missing?

Thanks!









The information contained in this communication is confidential and intended 
only for the use of the recipient named above, and may be legally privileged 
and exempt from disclosure under applicable law. If the reader of this message 
is not the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. If you 
have received this communication in error, please resend it to the sender and 
delete the original message and copy of it from your computer system. Opinions, 
conclusions and other information in this message that do not relate to our 
official business should be understood as neither given nor endorsed by the 
company.



The information contained in this communication is confidential and intended 
only for the use of the recipient named above, and may be legally privileged 
and exempt from disclosure under applicable law. If the reader of this message 
is not the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. If you 
have received this communication in error, please resend it to the sender and 
delete the original message and copy of it from your computer system. Opinions, 
conclusions and other information in this message that do not relate to our 
official business should be understood as neither given nor endorsed by the 
company.


Re: NoClassDefFoundError on cluster with httpclient 4.5.2

2016-10-24 Thread Till Rohrmann
Great to hear that you solved your problem :-)

Cheers,
Till

On Fri, Oct 21, 2016 at 12:34 PM, Yassine MARZOUGUI <
y.marzou...@mindlytix.com> wrote:

> Hi Till,
>
> The httpclient jar is included in the job jar. Looking at a similar issue
> FLINK-4587 , It turns
> out the problem is with maven shade plugin, since I'm building Flink from
> sources with maven 3.3.x.
> I was able to solve the problem by rebuilding "flink-dist" as suggested by
> Stephan in the comments.
>
> Best,
> Yassine
>
> 2016-10-20 11:05 GMT+02:00 Till Rohrmann :
>
>> Hi Yassine,
>>
>> can you check whether the httpclient jar is contained in your job jar
>> which you submit to the cluster?
>>
>> Cheers,
>> Till
>>
>> On Wed, Oct 19, 2016 at 6:41 PM, Yassine MARZOUGUI <
>> y.marzou...@mindlytix.com> wrote:
>>
>>> Hi all,
>>>
>>> I'm using httpclient with the following dependency:
>>>
>>> 
>>> org.apache.httpcomponents
>>> httpclient
>>> 4.5.2
>>> 
>>>
>>> On local mode, the program works correctly, but when executed on the
>>> cluster, I get the following exception:
>>>
>>> java.lang.Exception: The user defined 'open(Configuration)' method in
>>> class org.myorg.quickstart.Frequencies$2 caused an exception: Could not
>>> initialize class org.apache.http.conn.ssl.SSLConnectionSocketFactory
>>> at org.apache.flink.runtime.operators.BatchTask.openUserCode(Ba
>>> tchTask.java:1337)
>>> at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDr
>>> iver.openTask(ChainedFlatMapDriver.java:47)
>>> at org.apache.flink.runtime.operators.BatchTask.openChainedTask
>>> s(BatchTask.java:1377)
>>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat
>>> aSourceTask.java:124)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>>> at java.lang.Thread.run(Unknown Source)
>>> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
>>> org.apache.http.conn.ssl.SSLConnectionSocketFactory
>>> at org.apache.http.impl.client.HttpClientBuilder.build(HttpClie
>>> ntBuilder.java:966)
>>> at org.myorg.quickstart.Frequencies$2.open(Frequencies.java:82)
>>> at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>>> nFunction(FunctionUtils.java:38)
>>> at org.apache.flink.runtime.operators.BatchTask.openUserCode(Ba
>>> tchTask.java:1335)
>>> ... 5 more
>>>
>>> I'm using Flink 1.1.3. Any idea how to solve the problem? Thank you.
>>>
>>> Best,
>>> Yassine
>>>
>>
>>
>


Re: About stateful transformations

2016-10-24 Thread Gyula Fóra
Hi Juan,

Let me try to answer some of your questions :)

We have been running Flink Streaming at King for quite some time now with
multiple jobs having several hundred gigabytes of KV state stored in
RocksDB. I would say RocksDB state backend is definitely the best choice at
the moment for large deployments as you can also keep the heap relatively
small to save some time on GC. But you have to play around with the rocks
configuration to get the most out of it depending on your hardware.

I am not aware of any caching/TTL functionality exposed in the Flink APIs
currently. But if you are willing to dig a llittle deeper you could
implement a lower lever operator that uses timers like the windowing
mechanisms to clear state after some time.

When you are selecting a checkpoint directory (URL) you need to make sure
that it is accessible from all the task managers. HDFS is convenient but is
not strictly necessary. We for instance use CEPH that is mounted as a
regular disk from the OS's perspective so we can use file:// and still save
to the distributed storage. As far as I know using yarn doesnt give much
benefit, I am not sure if Flink exploits any data locality at this moment.

When you are running rocks db state backend there are two concepts you need
to think about for checkpointing. Your local rocks db directory, and the
checkpoint directory. The local directory is where the rocks instances are
created and they live on the taskmanagers local disk/memory. When Flink
takes a checkpoint a backup of all K-V pairs is copied as one blob to HDFS
or to the selected checkpoint directory. This means there is no data
fragmentation in the checkpoints. Similar applies to the FsStateBackend but
that keeps the local state strictly in memory.

I think you should definitely give RocksDB + HDFS a try. It works extremely
well for very large state sizes given some tuning, but should also perform
out-of-the-box :)

Cheers,
Gyula

Juan Rodríguez Hortalá  ezt írta
(időpont: 2016. okt. 23., V, 22:29):

> Hi all,
>
> I don't have much experience with Flink, so please forget me if I ask some
> obvious questions. I was taking a look to the documentation on stateful
> transformations in Flink at
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html.
> I'm mostly interested in Flink for stream processing, and I would like to
> know:
>
> - What is the biggest state that has been used in production deployments?
> I'm interested in how many GB of state, among all key-value pairs, have
> been used before in long running streaming jobs deployed in production.
> Maybe some company has shared this in some conference or blog post. I guess
> for that RocksDB backend is the best option for big states, to avoid being
> limited by the available memory.
>
> - Is there any pre built functionality for state eviction? I'm thinking of
> LRU cache-like behavior, with eviction based on time or size, similar to
> Guava cache (https://github.com/google/guava/wiki/CachesExplained). This
> is probably easy to implement anyway, by using the clear() primitive, but I
> wouldn't like to reinvent the wheel if this is already implemented
> somewhere.
>
> - When using file:// for the checkpointing URL, is the data replicated in
> the workers, or a failure in a worker leads to losing the state stored in
> that worker? I guess with hdfs:// we get the replication of HDFS, and we
> don't have that problem. I also guess S3 can be used for checkpointing
> state too, is there any remarkable performance impact of using s3 instead
> of HDFS for checkpointing? I guess some performance is lost compared to a
> setup running in YARN with collocated DataNodes and NodeManagers, but I
> wanted to know if the impact is negible, as checkpointing is performed at a
> relatively slow frequency. Also I'm interested on Flink running on EMR,
> where the impact of this should be even smaller because the access to S3 is
> faster from EMR than from an in-house YARN cluster outside the AWS cloud.
>
> - Is there any problem with the RocksDB backend on top of HDFS related to
> defragmentation? How is clear handled for long running jobs? I'm thinking
> on a streaming job that has a state with a size of several hundred GBs,
> where each key-pair is stored for a week and then deleted. How does clear()
> work, and how do you deal with the "small files problem" of HDFS (
> http://inquidia.com/news-and-info/working-small-files-hadoop-part-1) for
> the FsState and RocksDB backend on top of HDFS? I guess this wouldn' t be a
> problem for S3, as it is an object store that has no problem with small
> files.
>
> Thanks a lot for your help!
>
> Greetings,
>
> Juan Rodriguez Hortala
>