Return of Flink shading problems in 1.2.0

2017-03-16 Thread Foster, Craig
Hi:
A few months ago, I was building Flink and ran into shading issues for 
flink-dist as described in your docs. We resolved this in BigTop by adding the 
correct way to build flink-dist in the do-component-build script and everything 
was fine after that.

Now, I’m running into issues doing the same now in Flink 1.2.0 and I’m trying 
to figure out what’s changed and how to fix it. Here’s how the flink-dist jar 
looks with proper shading:

jar -tvf /usr/lib/flink/lib/flink-dist_2.10-1.1.4.jar | grep 
HttpConnectionParams
2485 Tue Jan 01 00:00:00 UTC 1980 
org/apache/flink/hadoop/shaded/org/apache/commons/httpclient/params/HttpConnectionParams.class
3479 Tue Jan 01 00:00:00 UTC 1980 
org/apache/flink/hadoop/shaded/org/apache/http/params/HttpConnectionParams.class

When I build Flink 1.2.0 in BigTop, here’s shading for the jar found in the RPM:

jar -tvf flink-dist_2.10-1.2.0.jar | grep HttpConnectionParams
2392 Tue Jan 01 00:00:00 GMT 1980 
org/apache/commons/httpclient/params/HttpConnectionParams.class
2485 Tue Jan 01 00:00:00 GMT 1980 
org/apache/flink/hadoop/shaded/org/apache/commons/httpclient/params/HttpConnectionParams.class
3479 Tue Jan 01 00:00:00 GMT 1980 
org/apache/flink/hadoop/shaded/org/apache/http/params/HttpConnectionParams.class
2868 Tue Jan 01 00:00:00 GMT 1980 
org/apache/http/params/HttpConnectionParams.class

I thought maybe it was some strange thing going on with BigTop, so then I tried 
just straight building Flink 1.2.0 (outside BigTop) and get the same shading:

jar -tvf flink-dist_2.10-1.2.0.jar | grep HttpConnectionParams
  2485 Fri Mar 17 05:41:16 GMT 2017 
org/apache/flink/hadoop/shaded/org/apache/commons/httpclient/params/HttpConnectionParams.class
  3479 Fri Mar 17 05:41:16 GMT 2017 
org/apache/flink/hadoop/shaded/org/apache/http/params/HttpConnectionParams.class
  2392 Fri Mar 17 05:41:24 GMT 2017 
org/apache/commons/httpclient/params/HttpConnectionParams.class
  2868 Fri Mar 17 05:41:24 GMT 2017 
org/apache/http/params/HttpConnectionParams.class

And, yes, this is after going into flink-dist and running mvn clean install 
again since I am using Maven 3.3.x.

Here’s a snippet from my Maven version:
mvn -version
Apache Maven 3.3.9 (bb52d8502b132ec0a5a3f4c09453c07478323dc5; 
2015-11-10T16:41:47+00:00)
Maven home: /usr/local/apache-maven
Java version: 1.8.0_121, vendor: Oracle Corporation

Any ideas on what my problem might be here?

Thanks,
Craig



Re: Data+control stream from kafka + window function - not working

2017-03-16 Thread Tzu-Li (Gordon) Tai
Hi Tarandeep,

Thanks for clarifying.

For the next step, I would recommend taking a look at 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_event_time.html
 and try to find out what exactly is wrong with the watermark progression. 
Flink 1.2 exposes watermarks as a metric, and that should help in figuring out 
why the windows aren’t firing.

Also, I see you have added a “WatermarkDebugger” in your job. Have you checked 
whether or not the watermarks printed there are identical (using getInput v.s. 
getKafkaInput)?

Cheers,
Gordon

On March 17, 2017 at 12:32:51 PM, Tarandeep Singh (tarand...@gmail.com) wrote:

Anyone?
Any suggestions what could be going wrong or what I am doing wrong?

Thanks,
Tarandeep


On Thu, Mar 16, 2017 at 7:34 AM, Tarandeep Singh  wrote:
Data is read from Kafka and yes I use different group id every time I run the 
code. I have put break points and print statements to verify that.

Also, if I don't connect with control stream the window function works. 

- Tarandeep

On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai  wrote:

Hi Tarandeep,

I haven’t looked at the rest of the code yet, but my first guess is that you 
might not be reading any data from Kafka at all:

private static DataStream readKafkaStream(String topic, 
StreamExecutionEnvironment env) throws IOException {

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "group-0009");
properties.setProperty("auto.offset.reset", "smallest");
return env.addSource(new FlinkKafkaConsumer08<>(topic, new 
SimpleStringSchema(), properties));
}

Have you tried using a different “group.id” everytime you’re re-running the job?
Note that the “auto.offset.reset” value is only respected when there aren’t any 
offsets for the group committed in Kafka.
So you might not actually be reading the complete “small_input.cv” dataset, 
unless you use a different group.id overtime.

Cheers,
Gordon

On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarand...@gmail.com) wrote:

Hi,

I am using flink-1.2 and reading data stream from Kafka (using 
FlinkKafkaConsumer08). I want to connect this data stream with another stream 
(read control stream) so as to do some filtering on the fly. After filtering, I 
am applying window function (tumbling/sliding event window) along with fold 
function. However, the window function does not get called.

Any help to debug/fix this is greatly appreciated!

Below is a reproducible code that one can run in IDE like IntelliJ or on flink 
cluster. You will need to have a running Kafka cluster (local or otherwise).
Create a topic and add test data points-

$KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper 
localhost:2181 --replication-factor 1 --partitions 1
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
test < small_input.csv

where small_input.csv contains the following lines-

p1,10.0f,2017-03-14 16:01:01
p1,10.0f,2017-03-14 16:01:02
p1,10.0f,2017-03-14 16:01:03
p1,10.0f,2017-03-14 16:01:04
p1,10.0f,2017-03-14 16:01:05
p1,10.0f,2017-03-14 16:01:10
p1,10.0f,2017-03-14 16:01:11
p1,10.0f,2017-03-14 16:01:12
p1,10.0f,2017-03-14 16:01:40
p1,10.0f,2017-03-14 16:01:50

Now you can run the code given below. Note:

1) In this example, I am not reading control stream from Kafka (but issue can 
be reproduced with this code as well)
2) If instead of reading data stream from kafka, I create stream from elements 
(i.e. use getInput function instead of getKafkaInput function), the code works 
and window function is fired.

Thanks,
Tarandeep



import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

Re: Data+control stream from kafka + window function - not working

2017-03-16 Thread Tarandeep Singh
Anyone?
Any suggestions what could be going wrong or what I am doing wrong?

Thanks,
Tarandeep


On Thu, Mar 16, 2017 at 7:34 AM, Tarandeep Singh 
wrote:

> Data is read from Kafka and yes I use different group id every time I run
> the code. I have put break points and print statements to verify that.
>
> Also, if I don't connect with control stream the window function works.
>
> - Tarandeep
>
> On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
> Hi Tarandeep,
>
> I haven’t looked at the rest of the code yet, but my first guess is that
> you might not be reading any data from Kafka at all:
>
> private static DataStream readKafkaStream(String topic, 
> StreamExecutionEnvironment env) throws IOException {
>
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("zookeeper.connect", "localhost:2181");
> properties.setProperty("group.id", "group-0009");
> properties.setProperty("auto.offset.reset", "smallest");
> return env.addSource(new FlinkKafkaConsumer08<>(topic, new 
> SimpleStringSchema(), properties));
> }
>
>
> Have you tried using a different “group.id” everytime you’re re-running
> the job?
> Note that the “auto.offset.reset” value is only respected when there
> aren’t any offsets for the group committed in Kafka.
> So you might not actually be reading the complete “small_input.cv”
> dataset, unless you use a different group.id overtime.
>
> Cheers,
> Gordon
>
> On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarand...@gmail.com)
> wrote:
>
> Hi,
>
> I am using flink-1.2 and reading data stream from Kafka (using
> FlinkKafkaConsumer08). I want to connect this data stream with another
> stream (read control stream) so as to do some filtering on the fly. After
> filtering, I am applying window function (tumbling/sliding event window)
> along with fold function. However, the window function does not get called.
>
> Any help to debug/fix this is greatly appreciated!
>
> Below is a reproducible code that one can run in IDE like IntelliJ or on
> flink cluster. You will need to have a running Kafka cluster (local or
> otherwise).
> Create a topic and add test data points-
>
> $KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper
> localhost:2181 --replication-factor 1 --partitions 1
> $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092
> --topic test < small_input.csv
>
> where small_input.csv contains the following lines-
>
> p1,10.0f,2017-03-14 16:01:01
> p1,10.0f,2017-03-14 16:01:02
> p1,10.0f,2017-03-14 16:01:03
> p1,10.0f,2017-03-14 16:01:04
> p1,10.0f,2017-03-14 16:01:05
> p1,10.0f,2017-03-14 16:01:10
> p1,10.0f,2017-03-14 16:01:11
> p1,10.0f,2017-03-14 16:01:12
> p1,10.0f,2017-03-14 16:01:40
> p1,10.0f,2017-03-14 16:01:50
>
> Now you can run the code given below. Note:
>
> 1) In this example, I am not reading control stream from Kafka (but issue
> can be reproduced with this code as well)
> 2) If instead of reading data stream from kafka, I create stream from
> elements (i.e. use getInput function instead of getKafkaInput function),
> the code works and window function is fired.
>
> Thanks,
> Tarandeep
>
>
>
> import org.apache.flink.api.common.functions.FoldFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple1;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
> import 
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
> import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import 
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> import org.apache.flink.util.Collector;
>
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.*;
>
> public class Test3 {
>
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnviron

Re: Appropriate State to use to buffer events in ProcessFunction

2017-03-16 Thread Yassine MARZOUGUI
Hi Xiaogang,

Indeed, the MapState is what I was looking for in order to have efficient
sorted state, as it would faciliate many use cases like this one, or
joining streams, etc. I searched a bit and found your contribution
 of MapState for the next 1.3
release, I'll see how it works for me.
Thank you for pointing this out, very helpful!

Best,
Yassine

2017-03-16 18:50 GMT+01:00 SHI Xiaogang :

> Hi Yassine,
>
> If I understand correctly, you are needing sorted states which
> unfortunately are not supported in Flink now.
> We have some ideas to provide such sorted states to facilitate the
> development of user applications. But it is still under discussion due to
> the concerns on back compatibility.
>
> Currently, I think we can work around the problem with MapStates in
> RocksDB statebackends.
> In RocksDB statebackends, each entry in MapState corresponds to an entry
> in RocksDB. The key of a RocksDB entry is formatted as "
> keyGroup#key#keyLen#namespace#namespaceLen#mapKey"
>
> The entries in RocksDB are sorted in the lexicographical order. In the
> cases where the map keys are typed Timestamp/Long, the entries in the
> MapState will be iterated as the same order in a sorted map. Thus, you can
> find all the events whose timestamps are smaller than the given one.
>
> The solution is quite tricky because it does not work when Heap
> statebackends are used. But given that the state may grow up to ~100GB,
> RocksDB statebackends are strongly recommended.
>
> May the information helps you.
>
> Regards,
> Xiaogang
>
> 2017-03-09 23:19 GMT+08:00 Yassine MARZOUGUI :
>
>> Hi Timo,
>>
>> I thought about the ListState but quickly discarded It as it keeps the
>> insersion order and not events order. After a second thought I think I will
>> reconsider it since my events are occaionally out-of-order. Didn't know
>> that Flink CEP operators 'next' and 'within', can handle event time, so I
>> think I will give it a try! Thank you!
>>
>> Best,
>> Yassine
>>
>> 2017-03-08 9:55 GMT+01:00 Timo Walther :
>>
>>> Hi Yassine,
>>>
>>> have you thought about using a ListState? As far as I know, it keeps at
>>> least the insertion order. You could sort it once your trigger event has
>>> arrived.
>>> If you use a RocksDB as state backend, 100+ GB of state should not be a
>>> problem. Have you thought about using Flink's CEP library? It might fit to
>>> your needs without implementing a custom process function.
>>>
>>> I hope that helps.
>>>
>>> Timo
>>>
>>>
>>> Am 07/03/17 um 19:23 schrieb Yassine MARZOUGUI:
>>>
>>> Hi all,

 I want to label events in a stream based on a condition on some future
 events.
 For example my stream contains events of type A and B and and I would
 like to assign a label 1 to an event E of type A if an event of type B
 happens within a duration x of E. I am using event time and my events can
 be out of order.
 For this I'm using ProcessFunction which looks suitable for my use
 case. In order to handle out of order events, I'm keeping events of type A
 in a state and once an event of type B is received, I fire an event time
 timer in which I loop through events of type A in the state having a
 timestamps < timer.timestamp, label them and remove them from the state.
 Currently the state is simply a value state containing a
 TreeMap. I'm keeping events sorted in order to
 effectively get events older than the timer timestamp.
 I wonder If that's the appropriate data structure to use in the value
 state to buffer events and be able to handle out of orderness, or if there
 is a more effective implementation, especially that the state may grow to
 reach ~100 GB sometimes?

 Any insight is appreciated.

 Thanks,
 Yassine




>>>
>>
>


Re: Batch stream Sink delay ?

2017-03-16 Thread Paul Smith
Due to the slight out of sequence of the log timestamps, I tried switching to a 
“BoundedOutOfOrdernessTimestampExtractor” and used a minute as the threshold, 
but I still couldn’t get the watermarks to fire.  Setting break points and 
trying to follow the code, I Can’t see hwere the getCurrentWaterMark() is being 
called..?

Is that done via a periodic timer?  The autoWaterMarkInterval? (which is using 
the default) is that responsible as a periodic “poller” of what the water marks 
are and used to trigger things?


Paul




From: Fabian Hueske 
Reply-To: "user@flink.apache.org" 
Date: Friday, 17 March 2017 at 9:11 am
To: "user@flink.apache.org" 
Subject: Re: Batch stream Sink delay ?

Actually, I think the program you shared in the first mail of this thread looks 
fine for the purpose you describe.
Timestamp and watermark assignment works as follows:
- For each records, a long timestamp is extracted (UNIX/epoch timestamp).
- A watermark is a timestamp which says that no more records with a timestamp 
lower than the watermark will be processed. Records which violate that 
condition are by default dropped.
- So the watermark should follow the max emitted record timestamp minus a 
safety margin.
- If you use a periodic watermark assigner, the watermark function is 
periodically called.
Best, Fabian


2017-03-16 22:54 GMT+01:00 Paul Smith 
mailto:psm...@aconex.com>>:
I have managed to discover that my presumption of log4j log file being a 
_guaranteed_ sequential order of time is incorrect (race conditions).  So some 
logs are out of sequence, and I was getting _some_ Monotonic Timestamp 
violations.  I did not discover this because somehow my local flink was not 
outputting logs properly. (a restart of the service seemed to fix that..)

*HOWEVER*.. Once I switched to an “allow lateness” style timestamp assigner, I 
still get the same problem.

Fundamentally I think I’ve come to the realisation that my concept of Event 
Time is not the same as Flink.  So let me start again to explain what I’d like 
to do.

I’d like to group-by log records and sum up the metric values according to 
their timestamp – effectively a SQL group by where the timestamp of the event 
is aggegrated into 15 minute chunks.  I’d like to be able to do (eventually) 
run this same job for Batch & Live streaming.

I had thought be assigning the Log rows timestamp to BE the Event Time, that 
the Time Window aggregation would work. But now I wonder if what I should be 
doing is assigning a new field, called ‘time bracket’ perhaps, and add that as 
one of the Keys for the stream (I was keying by “identifierType:identifier”, so 
change that to “identifierType:identifier:timeBracket”.  This way the Fold 
function can still sum up the metrics by this groupBy key, and then rely on 
some periodic _actual time_ watermarking (ingestion or Processing).

I think I should disconnect the Flink time system from this aggregation level. 
I had thought that because I wanted to be able to apply this job to both a Live 
Streaming and a Batch processing (historical logs) that I needed to use the 
Event Time model..

That could be my fatal flaw.  Does that make sense?

Paul

From: Fabian Hueske mailto:fhue...@gmail.com>>
Reply-To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Date: Thursday, 16 March 2017 at 10:22 pm

To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Batch stream Sink delay ?

What kind of timestamp and watermark extractor are you using?
Can you share your implementation?
You can have a look at the example programs (for example [1]). These can be 
started and debugged inside an IDE by executing the main method.
If you run it in an external process, you should be able to connect to the 
process with the standard options.
Best, Fabian

[1] 
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java

2017-03-16 12:10 GMT+01:00 Paul Smith 
mailto:psm...@aconex.com>>:
Thanks again for your reply.

I've tried with both Parallel=1 through to 3. Same behavior.

The log file is monotonically increasing time stamps generated through an 
application using log4j. Each log line is distinctly incrementing time stamps 
it is an 8GB file I'm using as a test case and has about 8 million rows.

Whether parallel of 1 or 3 the same output is shown, the data gets to the sink 
at the end and all looks correct - I set the folded results record time stamp 
to the end of each window and I see nice chunks of 15 minute blocks in the 
result.

I'm not sure why the watermarks are not being sent as the data progresses.

I might try pushing the data (same data) though Kafka and see if I get a 
different result. I might also take a sample through the file (rather than the 
whole file to see if I get differing results)

Is there a wiki page anywhere that shows how to debug a job thorough a

Re: Batch stream Sink delay ?

2017-03-16 Thread Fabian Hueske
Actually, I think the program you shared in the first mail of this thread
looks fine for the purpose you describe.

Timestamp and watermark assignment works as follows:
- For each records, a long timestamp is extracted (UNIX/epoch timestamp).
- A watermark is a timestamp which says that no more records with a
timestamp lower than the watermark will be processed. Records which violate
that condition are by default dropped.
- So the watermark should follow the max emitted record timestamp minus a
safety margin.
- If you use a periodic watermark assigner, the watermark function is
periodically called.

Best, Fabian


2017-03-16 22:54 GMT+01:00 Paul Smith :

> I have managed to discover that my presumption of log4j log file being a _
> *guaranteed*_ sequential order of time is incorrect (race conditions).
> So some logs are out of sequence, and I was getting _*some*_ Monotonic
> Timestamp violations.  I did not discover this because somehow my local
> flink was not outputting logs properly. (a restart of the service seemed to
> fix that..)
>
>
>
> **HOWEVER**.. Once I switched to an “allow lateness” style timestamp
> assigner, I still get the same problem.
>
>
> Fundamentally I think I’ve come to the realisation that my concept of
> Event Time is not the same as Flink.  So let me start again to explain what
> I’d like to do.
>
>
> I’d like to group-by log records and sum up the metric values according to
> their timestamp – effectively a SQL group by where the timestamp of the
> event is aggegrated into 15 minute chunks.  I’d like to be able to do
> (eventually) run this same job for Batch & Live streaming.
>
>
>
> I had thought be assigning the Log rows timestamp to BE the Event Time,
> that the Time Window aggregation would work. But now I wonder if what I
> should be doing is assigning a new field, called ‘time bracket’ perhaps,
> and add that as one of the Keys for the stream (I was keying by
> “identifierType:identifier”, so change that to 
> “identifierType:identifier:timeBracket”.
> This way the Fold function can still sum up the metrics by this groupBy
> key, and then rely on some periodic _*actual time*_ watermarking
> (ingestion or Processing).
>
>
>
> I think I should disconnect the Flink time system from this aggregation
> level. I had thought that because I wanted to be able to apply this job to
> both a Live Streaming and a Batch processing (historical logs) that I
> needed to use the Event Time model..
>
>
>
> That could be my fatal flaw.  Does that make sense?
>
>
>
> Paul
>
>
>
> *From: *Fabian Hueske 
> *Reply-To: *"user@flink.apache.org" 
> *Date: *Thursday, 16 March 2017 at 10:22 pm
>
> *To: *"user@flink.apache.org" 
> *Subject: *Re: Batch stream Sink delay ?
>
>
>
> What kind of timestamp and watermark extractor are you using?
>
> Can you share your implementation?
>
> You can have a look at the example programs (for example [1]). These can
> be started and debugged inside an IDE by executing the main method.
> If you run it in an external process, you should be able to connect to the
> process with the standard options.
>
> Best, Fabian
>
>
> [1] https://github.com/apache/flink/blob/master/flink-
> examples/flink-examples-streaming/src/main/java/org/
> apache/flink/streaming/examples/wordcount/WordCount.java
>
>
>
> 2017-03-16 12:10 GMT+01:00 Paul Smith :
>
> Thanks again for your reply.
>
>
>
> I've tried with both Parallel=1 through to 3. Same behavior.
>
>
>
> The log file is monotonically increasing time stamps generated through an
> application using log4j. Each log line is distinctly incrementing time
> stamps it is an 8GB file I'm using as a test case and has about 8 million
> rows.
>
>
>
> Whether parallel of 1 or 3 the same output is shown, the data gets to the
> sink at the end and all looks correct - I set the folded results record
> time stamp to the end of each window and I see nice chunks of 15 minute
> blocks in the result.
>
>
>
> I'm not sure why the watermarks are not being sent as the data progresses.
>
>
>
> I might try pushing the data (same data) though Kafka and see if I get a
> different result. I might also take a sample through the file (rather than
> the whole file to see if I get differing results)
>
>
>
> Is there a wiki page anywhere that shows how to debug a job thorough an
> IDE?  Can I easily remote attach to a running process via standard java
> options?
>
>
>
> Regards
>
>
>
> Paul
>
>
> On 16 Mar 2017, at 21:15, Fabian Hueske  wrote:
>
> Hi Paul,
>
> since each operator uses the minimum watermark of all its inputs, you must
> ensure that each parallel task is producing data.
>
> If a source does not produce data, it will not increase the timestamps of
> its watermarks.
>
> Another challenge, that you might run into is that you need to make sure
> that the file (or file chunks if it is split for parallel reading) is read
> in the increasing timestamp order.
>
> Otherwise, watermarks will be emitted too early. I
>
> If I got understood your use case corre

Re: Batch stream Sink delay ?

2017-03-16 Thread Paul Smith
I have managed to discover that my presumption of log4j log file being a 
_guaranteed_ sequential order of time is incorrect (race conditions).  So some 
logs are out of sequence, and I was getting _some_ Monotonic Timestamp 
violations.  I did not discover this because somehow my local flink was not 
outputting logs properly. (a restart of the service seemed to fix that..)

*HOWEVER*.. Once I switched to an “allow lateness” style timestamp assigner, I 
still get the same problem.

Fundamentally I think I’ve come to the realisation that my concept of Event 
Time is not the same as Flink.  So let me start again to explain what I’d like 
to do.

I’d like to group-by log records and sum up the metric values according to 
their timestamp – effectively a SQL group by where the timestamp of the event 
is aggegrated into 15 minute chunks.  I’d like to be able to do (eventually) 
run this same job for Batch & Live streaming.

I had thought be assigning the Log rows timestamp to BE the Event Time, that 
the Time Window aggregation would work. But now I wonder if what I should be 
doing is assigning a new field, called ‘time bracket’ perhaps, and add that as 
one of the Keys for the stream (I was keying by “identifierType:identifier”, so 
change that to “identifierType:identifier:timeBracket”.  This way the Fold 
function can still sum up the metrics by this groupBy key, and then rely on 
some periodic _actual time_ watermarking (ingestion or Processing).

I think I should disconnect the Flink time system from this aggregation level. 
I had thought that because I wanted to be able to apply this job to both a Live 
Streaming and a Batch processing (historical logs) that I needed to use the 
Event Time model..

That could be my fatal flaw.  Does that make sense?

Paul

From: Fabian Hueske 
Reply-To: "user@flink.apache.org" 
Date: Thursday, 16 March 2017 at 10:22 pm
To: "user@flink.apache.org" 
Subject: Re: Batch stream Sink delay ?

What kind of timestamp and watermark extractor are you using?
Can you share your implementation?
You can have a look at the example programs (for example [1]). These can be 
started and debugged inside an IDE by executing the main method.
If you run it in an external process, you should be able to connect to the 
process with the standard options.
Best, Fabian

[1] 
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java

2017-03-16 12:10 GMT+01:00 Paul Smith 
mailto:psm...@aconex.com>>:
Thanks again for your reply.

I've tried with both Parallel=1 through to 3. Same behavior.

The log file is monotonically increasing time stamps generated through an 
application using log4j. Each log line is distinctly incrementing time stamps 
it is an 8GB file I'm using as a test case and has about 8 million rows.

Whether parallel of 1 or 3 the same output is shown, the data gets to the sink 
at the end and all looks correct - I set the folded results record time stamp 
to the end of each window and I see nice chunks of 15 minute blocks in the 
result.

I'm not sure why the watermarks are not being sent as the data progresses.

I might try pushing the data (same data) though Kafka and see if I get a 
different result. I might also take a sample through the file (rather than the 
whole file to see if I get differing results)

Is there a wiki page anywhere that shows how to debug a job thorough an IDE?  
Can I easily remote attach to a running process via standard java options?

Regards

Paul

On 16 Mar 2017, at 21:15, Fabian Hueske 
mailto:fhue...@gmail.com>> wrote:
Hi Paul,
since each operator uses the minimum watermark of all its inputs, you must 
ensure that each parallel task is producing data.
If a source does not produce data, it will not increase the timestamps of its 
watermarks.
Another challenge, that you might run into is that you need to make sure that 
the file (or file chunks if it is split for parallel reading) is read in the 
increasing timestamp order.
Otherwise, watermarks will be emitted too early. I
If I got understood your use case correctly, you are just experimenting with 
file input to get a feeling for the API.
I would try to set the parallelism of the file source to 1 to ensure that the 
data is read in the same order and that all tasks are producing data.
Hope this helps,
Fabian

2017-03-15 23:54 GMT+01:00 Paul Smith 
mailto:psm...@aconex.com>>:
Thanks Fabian, I’m pretty sure you are correct here.  I can see in the Metric 
view that the currentLowWaterMark is set to MIN_VALUE by the looks of it, so 
Watermarks are not being emitted at all until the end.  This stays all the way 
through the job.

I’m not sure why this is the case.  I’ve verified that my TimestampExtractor 
class is being called, and returning the value I’d expect (The timestamp from 
the log line), and looks legitimate.  My WindowFunction which is doing the 
aggregation is not being called until right at the e

Re: Appropriate State to use to buffer events in ProcessFunction

2017-03-16 Thread SHI Xiaogang
Hi Yassine,

If I understand correctly, you are needing sorted states which
unfortunately are not supported in Flink now.
We have some ideas to provide such sorted states to facilitate the
development of user applications. But it is still under discussion due to
the concerns on back compatibility.

Currently, I think we can work around the problem with MapStates in RocksDB
statebackends.
In RocksDB statebackends, each entry in MapState corresponds to an entry in
RocksDB. The key of a RocksDB entry is formatted as "
keyGroup#key#keyLen#namespace#namespaceLen#mapKey"

The entries in RocksDB are sorted in the lexicographical order. In the
cases where the map keys are typed Timestamp/Long, the entries in the
MapState will be iterated as the same order in a sorted map. Thus, you can
find all the events whose timestamps are smaller than the given one.

The solution is quite tricky because it does not work when Heap
statebackends are used. But given that the state may grow up to ~100GB,
RocksDB statebackends are strongly recommended.

May the information helps you.

Regards,
Xiaogang

2017-03-09 23:19 GMT+08:00 Yassine MARZOUGUI :

> Hi Timo,
>
> I thought about the ListState but quickly discarded It as it keeps the
> insersion order and not events order. After a second thought I think I will
> reconsider it since my events are occaionally out-of-order. Didn't know
> that Flink CEP operators 'next' and 'within', can handle event time, so I
> think I will give it a try! Thank you!
>
> Best,
> Yassine
>
> 2017-03-08 9:55 GMT+01:00 Timo Walther :
>
>> Hi Yassine,
>>
>> have you thought about using a ListState? As far as I know, it keeps at
>> least the insertion order. You could sort it once your trigger event has
>> arrived.
>> If you use a RocksDB as state backend, 100+ GB of state should not be a
>> problem. Have you thought about using Flink's CEP library? It might fit to
>> your needs without implementing a custom process function.
>>
>> I hope that helps.
>>
>> Timo
>>
>>
>> Am 07/03/17 um 19:23 schrieb Yassine MARZOUGUI:
>>
>> Hi all,
>>>
>>> I want to label events in a stream based on a condition on some future
>>> events.
>>> For example my stream contains events of type A and B and and I would
>>> like to assign a label 1 to an event E of type A if an event of type B
>>> happens within a duration x of E. I am using event time and my events can
>>> be out of order.
>>> For this I'm using ProcessFunction which looks suitable for my use case.
>>> In order to handle out of order events, I'm keeping events of type A in a
>>> state and once an event of type B is received, I fire an event time timer
>>> in which I loop through events of type A in the state having a timestamps <
>>> timer.timestamp, label them and remove them from the state.
>>> Currently the state is simply a value state containing a
>>> TreeMap. I'm keeping events sorted in order to
>>> effectively get events older than the timer timestamp.
>>> I wonder If that's the appropriate data structure to use in the value
>>> state to buffer events and be able to handle out of orderness, or if there
>>> is a more effective implementation, especially that the state may grow to
>>> reach ~100 GB sometimes?
>>>
>>> Any insight is appreciated.
>>>
>>> Thanks,
>>> Yassine
>>>
>>>
>>>
>>>
>>
>


Re: Flink 1.2 and Cassandra Connector

2017-03-16 Thread Robert Metzger
I've created a pull request for the fix:
https://github.com/apache/flink/pull/3556

It would be nice if one of the issue reporters could validate that the
cassandra connector works after the fix.
If it is a valid fix, I would like to include it into the upcoming 1.2.1
release.

On Thu, Mar 16, 2017 at 6:08 PM, Robert Metzger  wrote:

> Yep, this is definitively a bug / misconfiguration in the build system.
>
> The cassandra client defines metrics-core as a dependency, but the shading
> is dropping the dependency when building the dependency reduced pom.
> To resolve the issue, we need to add the following line into the shading
> config of the cassandra module:
>
> true
>
> This makes the metrics dependency appear again in the dep red pom.
>
>
> I've filed a JIRA: https://issues.apache.org/jira/browse/FLINK-6084 and
> will open a PR.
>
> On Thu, Mar 16, 2017 at 1:08 PM, Stephan Ewen  wrote:
>
>> Can we improve the Flink experience here by adding this dependency
>> directly to the cassandra connector pom.xml (so that user jars always pull
>> it in via transitivity)?
>>
>> On Wed, Mar 15, 2017 at 4:09 PM, Nico  wrote:
>>
>>> Hi @all,
>>>
>>> I came back to this issue today...
>>>
>>> @Robert:
>>> "com/codahale/metrics/Metric" class was not available in the user code
>>> jar
>>>
>>> Even after adding the metric class into the build-jar profile of the pom
>>> file, more "class not found" errors occur. So the only solution was to add
>>> the whole dependency:
>>>
>>> 
>>>com.codahale.metrics
>>>metrics-core
>>>3.0.2
>>> 
>>>
>>>
>>> This worked for me.
>>>
>>> Best,
>>> Nico
>>>
>>>
>>>
>>> 2017-03-06 11:46 GMT+01:00 Chesnay Schepler :
>>>
 Hello,

 i believe the cassandra connector is not shading it's dependencies
 properly. This didn't cause issues in the
 past since flink used to have a dependency on codahale metrics as well.

 Please open a JIRA for this issue.

 Regards,
 Chesnay


 On 06.03.2017 11:32, Tarandeep Singh wrote:

 Hi Robert & Nico,

 I am facing the same problem (java.lang.NoClassDefFoundError:
 com/codahale/metrics/Metric)
 Can you help me identify shading issue in pom.xml file.

 My pom.xml content-
 -

 http://maven.apache.org/POM/4.0.0"; 
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";   
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
 http://maven.apache.org/xsd/maven-4.0.0.xsd";>   
 4.0.0  
 rfk-dataplatform   
 stream-processing   0.1.0   
 jar   Stream processing
  UTF-8
   1.2.0  
 1.7.7  
 1.2.17   
   org.apache.flink 
 flink-streaming-java_2.10 
 ${flink.version}   
   org.apache.flink 
 flink-clients_2.10 
 ${flink.version}   
   org.apache.flink 
 flink-connector-cassandra_2.10 
 1.2.0  
 org.apache.flink
 flink-statebackend-rocksdb_2.10
 1.2.0   
 org.slf4j 
 slf4j-log4j12 
 ${slf4j.version}   
   log4j log4j  
${log4j.version}  
 org.apache.avro
 avro1.8.1
 org.testng 
testng
 6.8test 

 org.apache.flink
 flink-connector-kafka-0.8_2.10
 1.2.0org.influxdb  
   influxdb-java
 2.5   
 build-jar 
 false  

 org.apache.flink   
 flink-java   
 ${flink.version}   provided  
  
 org.apache.flink   
 flink-streaming-java_2.10   
 ${flink.version}   provided  
  
 org.apache.flink   
 flink-clients_2.10   
 ${flink.version}   provided  
  
 org.slf4j   
 slf4j-log4j12   
 ${slf4j.version}   provided  
  
 log4j   log4j  
  ${log4j.version}   
 provided
  
 org.apache.maven.plugins  
 maven-shade-plugin  
 2.4.1   
 package  
  shade 
 

Re: Flink 1.2 and Cassandra Connector

2017-03-16 Thread Robert Metzger
Yep, this is definitively a bug / misconfiguration in the build system.

The cassandra client defines metrics-core as a dependency, but the shading
is dropping the dependency when building the dependency reduced pom.
To resolve the issue, we need to add the following line into the shading
config of the cassandra module:

true

This makes the metrics dependency appear again in the dep red pom.


I've filed a JIRA: https://issues.apache.org/jira/browse/FLINK-6084 and
will open a PR.

On Thu, Mar 16, 2017 at 1:08 PM, Stephan Ewen  wrote:

> Can we improve the Flink experience here by adding this dependency
> directly to the cassandra connector pom.xml (so that user jars always pull
> it in via transitivity)?
>
> On Wed, Mar 15, 2017 at 4:09 PM, Nico  wrote:
>
>> Hi @all,
>>
>> I came back to this issue today...
>>
>> @Robert:
>> "com/codahale/metrics/Metric" class was not available in the user code
>> jar
>>
>> Even after adding the metric class into the build-jar profile of the pom
>> file, more "class not found" errors occur. So the only solution was to add
>> the whole dependency:
>>
>> 
>>com.codahale.metrics
>>metrics-core
>>3.0.2
>> 
>>
>>
>> This worked for me.
>>
>> Best,
>> Nico
>>
>>
>>
>> 2017-03-06 11:46 GMT+01:00 Chesnay Schepler :
>>
>>> Hello,
>>>
>>> i believe the cassandra connector is not shading it's dependencies
>>> properly. This didn't cause issues in the
>>> past since flink used to have a dependency on codahale metrics as well.
>>>
>>> Please open a JIRA for this issue.
>>>
>>> Regards,
>>> Chesnay
>>>
>>>
>>> On 06.03.2017 11:32, Tarandeep Singh wrote:
>>>
>>> Hi Robert & Nico,
>>>
>>> I am facing the same problem (java.lang.NoClassDefFoundError:
>>> com/codahale/metrics/Metric)
>>> Can you help me identify shading issue in pom.xml file.
>>>
>>> My pom.xml content-
>>> -
>>>
>>> http://maven.apache.org/POM/4.0.0"; 
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";   
>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>   
>>> 4.0.0  rfk-dataplatform 
>>>   stream-processing   0.1.0   
>>> jar   Stream processing 
>>> UTF-8  
>>> 1.2.0  
>>> 1.7.7  
>>> 1.2.17
>>>  org.apache.flink 
>>> flink-streaming-java_2.10 
>>> ${flink.version}
>>>  org.apache.flink 
>>> flink-clients_2.10 
>>> ${flink.version}
>>>  org.apache.flink 
>>> flink-connector-cassandra_2.10 
>>> 1.2.0  
>>> org.apache.flink
>>> flink-statebackend-rocksdb_2.10
>>> 1.2.0   
>>> org.slf4j slf4j-log4j12 
>>> ${slf4j.version}
>>>  log4j 
>>> log4j ${log4j.version}  
>>> 
>>> org.apache.avroavro 
>>>1.8.1
>>> org.testng
>>> testng6.8   
>>>  testorg.apache.flink  
>>>   flink-connector-kafka-0.8_2.10
>>> 1.2.0org.influxdb   
>>>  influxdb-java
>>> 2.5
>>>
>>> build-jar 
>>> false  
>>>
>>> org.apache.flink   
>>> flink-java   
>>> ${flink.version}   provided   
>>> 
>>> org.apache.flink   
>>> flink-streaming-java_2.10   
>>> ${flink.version}   provided   
>>> 
>>> org.apache.flink   
>>> flink-clients_2.10   
>>> ${flink.version}   provided   
>>> 
>>> org.slf4j   
>>> slf4j-log4j12   
>>> ${slf4j.version}   provided   
>>> 
>>> log4j   log4j   
>>> ${log4j.version}   
>>> provided 
>>> 
>>> org.apache.maven.plugins  
>>> maven-shade-plugin  
>>> 2.4.1   
>>> package   
>>> shade   
>>>  
>>>  >> combine.self="override">   
>>>  
>>>   
>>>   
>>>  
>>> org.apache.maven.plugins
>>> maven-shade-plugin
>>> 2.4.1
>>> package   

Re: Data+control stream from kafka + window function - not working

2017-03-16 Thread Tarandeep Singh
Data is read from Kafka and yes I use different group id every time I run the 
code. I have put break points and print statements to verify that.

Also, if I don't connect with control stream the window function works. 

- Tarandeep

> On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi Tarandeep,
> 
> I haven’t looked at the rest of the code yet, but my first guess is that you 
> might not be reading any data from Kafka at all:
> 
>> private static DataStream readKafkaStream(String topic, 
>> StreamExecutionEnvironment env) throws IOException {
>> 
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers", "localhost:9092");
>> properties.setProperty("zookeeper.connect", "localhost:2181");
>> properties.setProperty("group.id", "group-0009");
>> properties.setProperty("auto.offset.reset", "smallest");
>> return env.addSource(new FlinkKafkaConsumer08<>(topic, new 
>> SimpleStringSchema(), properties));
>> }
> 
> 
> Have you tried using a different “group.id” everytime you’re re-running the 
> job?
> Note that the “auto.offset.reset” value is only respected when there aren’t 
> any offsets for the group committed in Kafka.
> So you might not actually be reading the complete “small_input.cv” dataset, 
> unless you use a different group.id overtime.
> 
> Cheers,
> Gordon
> 
>> On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarand...@gmail.com) wrote:
>> 
>> Hi,
>> 
>> I am using flink-1.2 and reading data stream from Kafka (using 
>> FlinkKafkaConsumer08). I want to connect this data stream with another 
>> stream (read control stream) so as to do some filtering on the fly. After 
>> filtering, I am applying window function (tumbling/sliding event window) 
>> along with fold function. However, the window function does not get called.
>> 
>> Any help to debug/fix this is greatly appreciated!
>> 
>> Below is a reproducible code that one can run in IDE like IntelliJ or on 
>> flink cluster. You will need to have a running Kafka cluster (local or 
>> otherwise).
>> Create a topic and add test data points-
>> 
>> $KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper 
>> localhost:2181 --replication-factor 1 --partitions 1
>> $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 
>> --topic test < small_input.csv
>> 
>> where small_input.csv contains the following lines-
>> 
>> p1,10.0f,2017-03-14 16:01:01
>> p1,10.0f,2017-03-14 16:01:02
>> p1,10.0f,2017-03-14 16:01:03
>> p1,10.0f,2017-03-14 16:01:04
>> p1,10.0f,2017-03-14 16:01:05
>> p1,10.0f,2017-03-14 16:01:10
>> p1,10.0f,2017-03-14 16:01:11
>> p1,10.0f,2017-03-14 16:01:12
>> p1,10.0f,2017-03-14 16:01:40
>> p1,10.0f,2017-03-14 16:01:50
>> 
>> Now you can run the code given below. Note:
>> 
>> 1) In this example, I am not reading control stream from Kafka (but issue 
>> can be reproduced with this code as well)
>> 2) If instead of reading data stream from kafka, I create stream from 
>> elements (i.e. use getInput function instead of getKafkaInput function), the 
>> code works and window function is fired.
>> 
>> Thanks,
>> Tarandeep
>> 
>> 
>> 
>> import org.apache.flink.api.common.functions.FoldFunction;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.api.java.tuple.Tuple;
>> import org.apache.flink.api.java.tuple.Tuple1;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.tuple.Tuple3;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
>> import 
>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>> import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
>> import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
>> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import 
>> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>> import org.apache.flink.util.Collector;
>> 
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.*;
>> 
>> public class Test3 {
>> 
>> public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 

Re: Checkpointing with RocksDB as statebackend

2017-03-16 Thread vinay patil
@ Stephan,

I am not using explicit Evictor in my code. I will try using the Fold
function if it does not break my existing functionality :)

@Robert : Thank you for your answer, yes I have already tried to set G1GC
 this morning using env.java.opts, it works.
Which is the recommended GC for Streaming application (running on YARN -
EMR ) ?

Regards,
Vinay Patil

On Thu, Mar 16, 2017 at 6:36 PM, rmetzger0 [via Apache Flink User Mailing
List archive.]  wrote:

> Yes, you can change the GC using the env.java.opts parameter.
> We are not setting any GC on YARN.
>
> On Thu, Mar 16, 2017 at 1:50 PM, Stephan Ewen <[hidden email]
> > wrote:
>
>> The only immediate workaround is to use windows with "reduce" or "fold"
>> or "aggregate" and not "apply". And to not use an evictor.
>>
>> The good news is that I think we have a good way of fixing this soon,
>> making an adjustment in RocksDB.
>>
>> For the Yarn / g1gc question: Not 100% sure about that - you can check if
>> it used g1gc. If not, you may be able to pass this through the
>> "env.java.opts" parameter. (cc robert for confirmation)
>>
>> Stephan
>>
>>
>>
>> On Thu, Mar 16, 2017 at 8:31 AM, vinay patil <[hidden email]
>> > wrote:
>>
>>> Hi Stephan,
>>>
>>> What can be the workaround for this ?
>>>
>>> Also need one confirmation : Is G1 GC used by default when running the
>>> pipeline on YARN. (I see a thread of 2015 where G1 is used by default for
>>> JAVA8)
>>>
>>>
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Wed, Mar 15, 2017 at 10:32 PM, Stephan Ewen [via Apache Flink User
>>> Mailing List archive.] <[hidden email]
>>> > wrote:
>>>
 Hi Vinay!

 Savepoints also call the same problematic RocksDB function,
 unfortunately.

 We will have a fix next month. We either (1) get a patched RocksDB
 version or we (2) implement a different pattern for ListState in Flink.

 (1) would be the better solution, so we are waiting for a response from
 the RocksDB folks. (2) is always possible if we cannot get a fix from
 RocksDB.

 Stephan


 On Wed, Mar 15, 2017 at 5:53 PM, vinay patil <[hidden email]
 > wrote:

> Hi Stephan,
>
> Thank you for making me aware of this.
>
> Yes I am using a window without reduce function (Apply function). The
> discussion happening on JIRA is exactly what I am observing, consistent
> failure of checkpoints after some time and the stream halts.
>
> We want to go live in next month, not sure how this will affect in
> production as we are going to get above 200 million data.
>
> As a workaround can I take the savepoint while the pipeline is running
> ? Let's say if I take savepoint after every 30minutes, will it work ?
>
>
>
> Regards,
> Vinay Patil
>
> On Tue, Mar 14, 2017 at 10:02 PM, Stephan Ewen [via Apache Flink User
> Mailing List archive.] <[hidden email]
> > wrote:
>
>> The issue in Flink is https://issues.apache.org/j
>> ira/browse/FLINK-5756
>>
>> On Tue, Mar 14, 2017 at 3:40 PM, Stefan Richter <[hidden email]
>> > wrote:
>>
>>> Hi Vinay,
>>>
>>> I think the issue is tracked here: https://github.com/faceb
>>> ook/rocksdb/issues/1988.
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 14.03.2017 um 15:31 schrieb Vishnu Viswanath <[hidden email]
>>> >:
>>>
>>> Hi Stephan,
>>>
>>> Is there a ticket number/link to track this, My job has all the
>>> conditions you mentioned.
>>>
>>> Thanks,
>>> Vishnu
>>>
>>> On Tue, Mar 14, 2017 at 7:13 AM, Stephan Ewen <[hidden email]
>>> > wrote:
>>>
 Hi Vinay!

 We just discovered a bug in RocksDB. The bug affects windows
 without reduce() or fold(), windows with evictors, and ListState.

 A certain access pattern in RocksDB starts being so slow after a
 certain size-per-key that it basically brings down the streaming 
 program
 and the snapshots.

 We are reaching out to the RocksDB folks and looking for
 workarounds in Flink.

 Greetings,
 Stephan


 On Wed, Mar 1, 2017 at 12:10 PM, Stephan Ewen <[hidden email]
 > wrote:

> @vinay  Can you try to not set the buffer timeout at all? I am
> actually not sure what would be the effect of setting it to a negative
> value, that can be a cause of problems..

Re: Checkpointing with RocksDB as statebackend

2017-03-16 Thread Robert Metzger
Yes, you can change the GC using the env.java.opts parameter.
We are not setting any GC on YARN.

On Thu, Mar 16, 2017 at 1:50 PM, Stephan Ewen  wrote:

> The only immediate workaround is to use windows with "reduce" or "fold" or
> "aggregate" and not "apply". And to not use an evictor.
>
> The good news is that I think we have a good way of fixing this soon,
> making an adjustment in RocksDB.
>
> For the Yarn / g1gc question: Not 100% sure about that - you can check if
> it used g1gc. If not, you may be able to pass this through the
> "env.java.opts" parameter. (cc robert for confirmation)
>
> Stephan
>
>
>
> On Thu, Mar 16, 2017 at 8:31 AM, vinay patil 
> wrote:
>
>> Hi Stephan,
>>
>> What can be the workaround for this ?
>>
>> Also need one confirmation : Is G1 GC used by default when running the
>> pipeline on YARN. (I see a thread of 2015 where G1 is used by default for
>> JAVA8)
>>
>>
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Mar 15, 2017 at 10:32 PM, Stephan Ewen [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> > wrote:
>>
>>> Hi Vinay!
>>>
>>> Savepoints also call the same problematic RocksDB function,
>>> unfortunately.
>>>
>>> We will have a fix next month. We either (1) get a patched RocksDB
>>> version or we (2) implement a different pattern for ListState in Flink.
>>>
>>> (1) would be the better solution, so we are waiting for a response from
>>> the RocksDB folks. (2) is always possible if we cannot get a fix from
>>> RocksDB.
>>>
>>> Stephan
>>>
>>>
>>> On Wed, Mar 15, 2017 at 5:53 PM, vinay patil <[hidden email]
>>> > wrote:
>>>
 Hi Stephan,

 Thank you for making me aware of this.

 Yes I am using a window without reduce function (Apply function). The
 discussion happening on JIRA is exactly what I am observing, consistent
 failure of checkpoints after some time and the stream halts.

 We want to go live in next month, not sure how this will affect in
 production as we are going to get above 200 million data.

 As a workaround can I take the savepoint while the pipeline is running
 ? Let's say if I take savepoint after every 30minutes, will it work ?



 Regards,
 Vinay Patil

 On Tue, Mar 14, 2017 at 10:02 PM, Stephan Ewen [via Apache Flink User
 Mailing List archive.] <[hidden email]
 > wrote:

> The issue in Flink is https://issues.apache.org/jira/browse/FLINK-5756
>
> On Tue, Mar 14, 2017 at 3:40 PM, Stefan Richter <[hidden email]
> > wrote:
>
>> Hi Vinay,
>>
>> I think the issue is tracked here: https://github.com/faceb
>> ook/rocksdb/issues/1988.
>>
>> Best,
>> Stefan
>>
>> Am 14.03.2017 um 15:31 schrieb Vishnu Viswanath <[hidden email]
>> >:
>>
>> Hi Stephan,
>>
>> Is there a ticket number/link to track this, My job has all the
>> conditions you mentioned.
>>
>> Thanks,
>> Vishnu
>>
>> On Tue, Mar 14, 2017 at 7:13 AM, Stephan Ewen <[hidden email]
>> > wrote:
>>
>>> Hi Vinay!
>>>
>>> We just discovered a bug in RocksDB. The bug affects windows without
>>> reduce() or fold(), windows with evictors, and ListState.
>>>
>>> A certain access pattern in RocksDB starts being so slow after a
>>> certain size-per-key that it basically brings down the streaming program
>>> and the snapshots.
>>>
>>> We are reaching out to the RocksDB folks and looking for workarounds
>>> in Flink.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Wed, Mar 1, 2017 at 12:10 PM, Stephan Ewen <[hidden email]
>>> > wrote:
>>>
 @vinay  Can you try to not set the buffer timeout at all? I am
 actually not sure what would be the effect of setting it to a negative
 value, that can be a cause of problems...


 On Mon, Feb 27, 2017 at 7:44 PM, Seth Wiesman <[hidden email]
 > wrote:

> Vinay,
>
>
>
> The bucketing sink performs rename operations during the
> checkpoint and if it tries to rename a file that is not yet 
> consistent that
> would cause a FileNotFound exception which would fail the checkpoint.
>
>
>
> Stephan,
>
>
>
> Currently my aws fork contains some very specific assumptions
> about the pipeline that will in general only hold for my pipeline. 
> This is
> because there were still some open q

Re: Checkpointing with RocksDB as statebackend

2017-03-16 Thread Stephan Ewen
The only immediate workaround is to use windows with "reduce" or "fold" or
"aggregate" and not "apply". And to not use an evictor.

The good news is that I think we have a good way of fixing this soon,
making an adjustment in RocksDB.

For the Yarn / g1gc question: Not 100% sure about that - you can check if
it used g1gc. If not, you may be able to pass this through the
"env.java.opts" parameter. (cc robert for confirmation)

Stephan



On Thu, Mar 16, 2017 at 8:31 AM, vinay patil 
wrote:

> Hi Stephan,
>
> What can be the workaround for this ?
>
> Also need one confirmation : Is G1 GC used by default when running the
> pipeline on YARN. (I see a thread of 2015 where G1 is used by default for
> JAVA8)
>
>
>
> Regards,
> Vinay Patil
>
> On Wed, Mar 15, 2017 at 10:32 PM, Stephan Ewen [via Apache Flink User
> Mailing List archive.] <[hidden email]
> > wrote:
>
>> Hi Vinay!
>>
>> Savepoints also call the same problematic RocksDB function, unfortunately.
>>
>> We will have a fix next month. We either (1) get a patched RocksDB
>> version or we (2) implement a different pattern for ListState in Flink.
>>
>> (1) would be the better solution, so we are waiting for a response from
>> the RocksDB folks. (2) is always possible if we cannot get a fix from
>> RocksDB.
>>
>> Stephan
>>
>>
>> On Wed, Mar 15, 2017 at 5:53 PM, vinay patil <[hidden email]
>> > wrote:
>>
>>> Hi Stephan,
>>>
>>> Thank you for making me aware of this.
>>>
>>> Yes I am using a window without reduce function (Apply function). The
>>> discussion happening on JIRA is exactly what I am observing, consistent
>>> failure of checkpoints after some time and the stream halts.
>>>
>>> We want to go live in next month, not sure how this will affect in
>>> production as we are going to get above 200 million data.
>>>
>>> As a workaround can I take the savepoint while the pipeline is running ?
>>> Let's say if I take savepoint after every 30minutes, will it work ?
>>>
>>>
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Tue, Mar 14, 2017 at 10:02 PM, Stephan Ewen [via Apache Flink User
>>> Mailing List archive.] <[hidden email]
>>> > wrote:
>>>
 The issue in Flink is https://issues.apache.org/jira/browse/FLINK-5756

 On Tue, Mar 14, 2017 at 3:40 PM, Stefan Richter <[hidden email]
 > wrote:

> Hi Vinay,
>
> I think the issue is tracked here: https://github.com/faceb
> ook/rocksdb/issues/1988.
>
> Best,
> Stefan
>
> Am 14.03.2017 um 15:31 schrieb Vishnu Viswanath <[hidden email]
> >:
>
> Hi Stephan,
>
> Is there a ticket number/link to track this, My job has all the
> conditions you mentioned.
>
> Thanks,
> Vishnu
>
> On Tue, Mar 14, 2017 at 7:13 AM, Stephan Ewen <[hidden email]
> > wrote:
>
>> Hi Vinay!
>>
>> We just discovered a bug in RocksDB. The bug affects windows without
>> reduce() or fold(), windows with evictors, and ListState.
>>
>> A certain access pattern in RocksDB starts being so slow after a
>> certain size-per-key that it basically brings down the streaming program
>> and the snapshots.
>>
>> We are reaching out to the RocksDB folks and looking for workarounds
>> in Flink.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Mar 1, 2017 at 12:10 PM, Stephan Ewen <[hidden email]
>> > wrote:
>>
>>> @vinay  Can you try to not set the buffer timeout at all? I am
>>> actually not sure what would be the effect of setting it to a negative
>>> value, that can be a cause of problems...
>>>
>>>
>>> On Mon, Feb 27, 2017 at 7:44 PM, Seth Wiesman <[hidden email]
>>> > wrote:
>>>
 Vinay,



 The bucketing sink performs rename operations during the checkpoint
 and if it tries to rename a file that is not yet consistent that would
 cause a FileNotFound exception which would fail the checkpoint.



 Stephan,



 Currently my aws fork contains some very specific assumptions about
 the pipeline that will in general only hold for my pipeline. This is
 because there were still some open questions that  I had about how to 
 solve
 consistency issues in the general case. I will comment on the Jira 
 issue
 with more specific.



 Seth Wiesman



 *From: *vinay patil <[hidden email]
 

Re: Flink 1.2 and Cassandra Connector

2017-03-16 Thread Stephan Ewen
Can we improve the Flink experience here by adding this dependency directly
to the cassandra connector pom.xml (so that user jars always pull it in via
transitivity)?

On Wed, Mar 15, 2017 at 4:09 PM, Nico  wrote:

> Hi @all,
>
> I came back to this issue today...
>
> @Robert:
> "com/codahale/metrics/Metric" class was not available in the user code
> jar
>
> Even after adding the metric class into the build-jar profile of the pom
> file, more "class not found" errors occur. So the only solution was to add
> the whole dependency:
>
> 
>com.codahale.metrics
>metrics-core
>3.0.2
> 
>
>
> This worked for me.
>
> Best,
> Nico
>
>
>
> 2017-03-06 11:46 GMT+01:00 Chesnay Schepler :
>
>> Hello,
>>
>> i believe the cassandra connector is not shading it's dependencies
>> properly. This didn't cause issues in the
>> past since flink used to have a dependency on codahale metrics as well.
>>
>> Please open a JIRA for this issue.
>>
>> Regards,
>> Chesnay
>>
>>
>> On 06.03.2017 11:32, Tarandeep Singh wrote:
>>
>> Hi Robert & Nico,
>>
>> I am facing the same problem (java.lang.NoClassDefFoundError:
>> com/codahale/metrics/Metric)
>> Can you help me identify shading issue in pom.xml file.
>>
>> My pom.xml content-
>> -
>>
>> http://maven.apache.org/POM/4.0.0"; 
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";   
>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>   
>> 4.0.0  rfk-dataplatform  
>>  stream-processing   0.1.0   
>> jar   Stream processing  
>>UTF-8  
>> 1.2.0  
>> 1.7.7  
>> 1.2.17
>>  org.apache.flink 
>> flink-streaming-java_2.10 
>> ${flink.version} 
>> org.apache.flink 
>> flink-clients_2.10 
>> ${flink.version} 
>> org.apache.flink 
>> flink-connector-cassandra_2.10 
>> 1.2.0  
>> org.apache.flink
>> flink-statebackend-rocksdb_2.10
>> 1.2.0   
>> org.slf4j slf4j-log4j12  
>>${slf4j.version}
>>  log4j 
>> log4j ${log4j.version}   
>>
>> org.apache.avroavro  
>>   1.8.1  
>>   org.testng
>> testng6.8
>> testorg.apache.flink   
>>  flink-connector-kafka-0.8_2.10
>> 1.2.0org.influxdb
>> influxdb-java
>> 2.5 
>> 
>>   build-jar 
>> false  
>>
>> org.apache.flink   
>> flink-java   
>> ${flink.version}   provided
>>
>> org.apache.flink   
>> flink-streaming-java_2.10   
>> ${flink.version}   provided
>>
>> org.apache.flink   
>> flink-clients_2.10   
>> ${flink.version}   provided
>>
>> org.slf4j   
>> slf4j-log4j12   
>> ${slf4j.version}   provided
>>
>> log4j   log4j
>>${log4j.version}   
>> provided  
>>
>> org.apache.maven.plugins  
>> maven-shade-plugin  
>> 2.4.1   
>> package
>>shade 
>>   
>>   > combine.self="override">
>>  
>>  
>>
>> org.apache.maven.plugins
>> maven-shade-plugin
>> 2.4.1
>> package   
>> shade
>>  
>>  
>>  org.apache.flink:flink-annotations   
>> org.apache.flink:flink-shaded-hadoop2 
>>   
>> org.apache.flink:flink-shaded-curator-recipes 
>>   org.apache.flink:flink-core 
>>   org.apache.flink:flink-java 
>>   org.apache.flink:flink-scala_2.10   
>> org.apache.flink:flink-runtime_2.10   
>> org.apache.flink:flink-optimizer_2.10 
>>   or

Re: Batch stream Sink delay ?

2017-03-16 Thread Fabian Hueske
What kind of timestamp and watermark extractor are you using?
Can you share your implementation?

You can have a look at the example programs (for example [1]). These can be
started and debugged inside an IDE by executing the main method.
If you run it in an external process, you should be able to connect to the
process with the standard options.

Best, Fabian

[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java


2017-03-16 12:10 GMT+01:00 Paul Smith :

> Thanks again for your reply.
>
> I've tried with both Parallel=1 through to 3. Same behavior.
>
> The log file is monotonically increasing time stamps generated through an
> application using log4j. Each log line is distinctly incrementing time
> stamps it is an 8GB file I'm using as a test case and has about 8 million
> rows.
>
> Whether parallel of 1 or 3 the same output is shown, the data gets to the
> sink at the end and all looks correct - I set the folded results record
> time stamp to the end of each window and I see nice chunks of 15 minute
> blocks in the result.
>
> I'm not sure why the watermarks are not being sent as the data progresses.
>
> I might try pushing the data (same data) though Kafka and see if I get a
> different result. I might also take a sample through the file (rather than
> the whole file to see if I get differing results)
>
> Is there a wiki page anywhere that shows how to debug a job thorough an
> IDE?  Can I easily remote attach to a running process via standard java
> options?
>
> Regards
>
> Paul
>
> On 16 Mar 2017, at 21:15, Fabian Hueske  wrote:
>
> Hi Paul,
>
> since each operator uses the minimum watermark of all its inputs, you must
> ensure that each parallel task is producing data.
> If a source does not produce data, it will not increase the timestamps of
> its watermarks.
>
> Another challenge, that you might run into is that you need to make sure
> that the file (or file chunks if it is split for parallel reading) is read
> in the increasing timestamp order.
> Otherwise, watermarks will be emitted too early. I
>
> If I got understood your use case correctly, you are just experimenting
> with file input to get a feeling for the API.
> I would try to set the parallelism of the file source to 1 to ensure that
> the data is read in the same order and that all tasks are producing data.
>
> Hope this helps,
> Fabian
>
> 2017-03-15 23:54 GMT+01:00 Paul Smith :
>
>> Thanks Fabian, I’m pretty sure you are correct here.  I can see in the
>> Metric view that the currentLowWaterMark is set to MIN_VALUE by the looks
>> of it, so Watermarks are not being emitted at all until the end.  This
>> stays all the way through the job.
>>
>>
>>
>> I’m not sure why this is the case.  I’ve verified that my
>> TimestampExtractor class is being called, and returning the value I’d
>> expect (The timestamp from the log line), and looks legitimate.  My
>> WindowFunction which is doing the aggregation is not being called until
>> right at the end of the job, but yet the windows comes out ok in the
>> destination.
>>
>>
>>
>> I am not sure how to debug this one.  Do you or anyone have any other
>> suggestions on how to debug why the Windowing is not being triggered?  I
>> can’t glean any useful further ideas from the metrics I can see..
>>
>>
>>
>> Appreciate the help
>>
>>
>>
>> Cheers,
>>
>>
>> Paul
>>
>>
>>
>> *From: *Fabian Hueske 
>> *Reply-To: *"user@flink.apache.org" 
>> *Date: *Tuesday, 14 March 2017 at 7:59 pm
>> *To: *"user@flink.apache.org" 
>> *Subject: *Re: Batch stream Sink delay ?
>>
>>
>>
>> Hi Paul,
>>
>> This might be an issue with the watermarks. A window operation can only
>> be compute and emit its results when the watermark time is later than the
>> end time of the window.
>>
>> Each operator keeps track of the maximum timestamp of all its input tasks
>> and computes its own time as the minimum of all those maximum timestamps.
>>
>> If one (or all) watermarks received by the window operator is not later
>> than the end time of the window, the window will not be computed.
>>
>> When an file input is completely processed, Flink sends a Long.MAX_VALUE
>> timestamp which might trigger the execution at the end of the job.
>>
>> I would try to debug the watermarks of your job. The web dashboard
>> provides a few metrics for that.
>>
>> Best, Fabian
>>
>>
>>
>> 2017-03-14 2:47 GMT+01:00 Paul Smith :
>>
>> Using Flink 1.2.
>>
>>
>>
>> Hi all, I have a question about Batch processing and Sinks.  I have a
>> Flink job that parses a User Request log file that contains performance
>> data per request. It accumulates metric data values into 15 minute time
>> windows.  Each log line is mapped to multiple records, so that each of the
>> metric value can be 'billed' against a few different categories (User,
>> Organisation, Project, Action).  The goal of the Flink job is to distil the
>> volume of request data into more managea

Re: Batch stream Sink delay ?

2017-03-16 Thread Paul Smith
Thanks again for your reply.

I've tried with both Parallel=1 through to 3. Same behavior.

The log file is monotonically increasing time stamps generated through an 
application using log4j. Each log line is distinctly incrementing time stamps 
it is an 8GB file I'm using as a test case and has about 8 million rows.

Whether parallel of 1 or 3 the same output is shown, the data gets to the sink 
at the end and all looks correct - I set the folded results record time stamp 
to the end of each window and I see nice chunks of 15 minute blocks in the 
result.

I'm not sure why the watermarks are not being sent as the data progresses.

I might try pushing the data (same data) though Kafka and see if I get a 
different result. I might also take a sample through the file (rather than the 
whole file to see if I get differing results)

Is there a wiki page anywhere that shows how to debug a job thorough an IDE?  
Can I easily remote attach to a running process via standard java options?

Regards

Paul

On 16 Mar 2017, at 21:15, Fabian Hueske 
mailto:fhue...@gmail.com>> wrote:

Hi Paul,

since each operator uses the minimum watermark of all its inputs, you must 
ensure that each parallel task is producing data.
If a source does not produce data, it will not increase the timestamps of its 
watermarks.

Another challenge, that you might run into is that you need to make sure that 
the file (or file chunks if it is split for parallel reading) is read in the 
increasing timestamp order.
Otherwise, watermarks will be emitted too early. I

If I got understood your use case correctly, you are just experimenting with 
file input to get a feeling for the API.
I would try to set the parallelism of the file source to 1 to ensure that the 
data is read in the same order and that all tasks are producing data.

Hope this helps,
Fabian

2017-03-15 23:54 GMT+01:00 Paul Smith 
mailto:psm...@aconex.com>>:
Thanks Fabian, I’m pretty sure you are correct here.  I can see in the Metric 
view that the currentLowWaterMark is set to MIN_VALUE by the looks of it, so 
Watermarks are not being emitted at all until the end.  This stays all the way 
through the job.

I’m not sure why this is the case.  I’ve verified that my TimestampExtractor 
class is being called, and returning the value I’d expect (The timestamp from 
the log line), and looks legitimate.  My WindowFunction which is doing the 
aggregation is not being called until right at the end of the job, but yet the 
windows comes out ok in the destination.

I am not sure how to debug this one.  Do you or anyone have any other 
suggestions on how to debug why the Windowing is not being triggered?  I can’t 
glean any useful further ideas from the metrics I can see..

Appreciate the help

Cheers,

Paul

From: Fabian Hueske mailto:fhue...@gmail.com>>
Reply-To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Date: Tuesday, 14 March 2017 at 7:59 pm
To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Batch stream Sink delay ?

Hi Paul,
This might be an issue with the watermarks. A window operation can only be 
compute and emit its results when the watermark time is later than the end time 
of the window.
Each operator keeps track of the maximum timestamp of all its input tasks and 
computes its own time as the minimum of all those maximum timestamps.
If one (or all) watermarks received by the window operator is not later than 
the end time of the window, the window will not be computed.
When an file input is completely processed, Flink sends a Long.MAX_VALUE 
timestamp which might trigger the execution at the end of the job.
I would try to debug the watermarks of your job. The web dashboard provides a 
few metrics for that.
Best, Fabian

2017-03-14 2:47 GMT+01:00 Paul Smith 
mailto:psm...@aconex.com>>:
Using Flink 1.2.

Hi all, I have a question about Batch processing and Sinks.  I have a Flink job 
that parses a User Request log file that contains performance data per request. 
It accumulates metric data values into 15 minute time windows.  Each log line 
is mapped to multiple records, so that each of the metric value can be 'billed' 
against a few different categories (User, Organisation, Project, Action).  The 
goal of the Flink job is to distil the volume of request data into more 
manageable summaries.  We can then see breakdowns of, say, User CPU utilised by 
distinct categories (e.g. Users) and allow us to look for trends/outliers.

It is working very well as a batch, but with one unexpected behaviour.

What I can't understand is that the Sink does not appear to get _any_ records 
until the rest of the chain has completed over the entire file.  I've played 
around with parallelism (1->3), and also verified with logging that the Sink 
isn't seeing any data until the entire previous chain is complete.  Is this 
expected behaviour? I was thinking that as each Time Window passed a 'block' of 
the resul

Re: SQL + flatten (or .*) quality docs location?

2017-03-16 Thread Fabian Hueske
Hi Stu,

there is only one page of documentation for the Table API and SQL [1].
I agree the structure could be improved and split into multiple pages.

Regarding the flatting of a Pojo have a look at the "Built-In Functions"
section [2].
If you select "SQL" and head to the "Value access functions", you'll find

> tableName.compositeType.* : Converts a Flink composite type (such as
Tuple, POJO, etc.) and all of its direct subtypes into a flat
representation where every subtype is a separate field.


The following program works returns the correct result:

// POJO definition
class MyPojo(var x: Int, var y: Int) {
  def this() = this(0, 0)
}

// SQL query
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val ds = env.fromElements((0, new MyPojo(1, 2)), (1, new MyPojo(2, 3)), (2,
new MyPojo(3, 4)) )
tEnv.registerDataSet("Pojos", ds, 'id, 'pojo)

val result = tEnv.sql("SELECT id, Pojos.pojo.* FROM Pojos") // you need to
include the table name to flatten a Pojo

val results = result.toDataSet[Row].collect()
println(results.mkString("\n"))

// Result
0,1,2
1,2,3
2,3,4

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/table_api.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/table_api.html#built-in-functions

2017-03-15 21:31 GMT+01:00 Stu Smith :

> The documentation seems to indicate that there is a flatten method
> available in the sql language interface (in the table of available
> methods), or, alternatively using the '*' character somehow (in the text
> above the table).
>
> Yet I cannot flatten a POJO type, nor can I find any sufficient
> documentation in the official docs, searching the mailing list via
> markmail, looking through the examples in the source, or looking for
> through the SQL tests in the source.
>
> Can someone point me to the correct location for some solid flink SQL
> examples and docs?
>
> Take care,
>   -stu
>


Re: Questions regarding queryable state

2017-03-16 Thread Ufuk Celebi
On Thu, Mar 16, 2017 at 10:00 AM, Kathleen Sharp
 wrote:
> Hi,
>
> I have some questions regarding the Queryable State feature:
>
> Is it possible to use the QueryClient to get a list of keys for a given State?

No, this is not possible at the moment. You would have to trigger a
query for each key and then gather all results (for example via
Future.sequence [1]).

> At the moment it is not possible to use ListState - will this ever be
> introduced?

This is not exposed via the asQueryableState API, because it is not
possible to ever clear the list in that case. You can write a custom
function and create the list state yourself, like this (pseudo code):

stream.keyBy().flatMap({
   ListStateDescriptor listStateDesc = ...
   listStateDesc.setQueryable(name); // make queryable

   ListState state = getRuntimeContext().getListState(listState)
});

I would only use this if the list is also cleared at some point in
time (otherwise it will grow without bounds).

>
> My first impression is that I would need one of these 2 to be able to
> use Queryable state.
>
> I would then probably need to build only top of the queryable state
> client to allow filtering, pagination etc of results. Is the intention
> to enrich the client at some point with this (assuming list state
> becomes supported)?

I can imagine to support querying multiple keys at once, with list
state I'm not sure.

> The queryable state client needs a job id, is there any recommended
> way of getting ahold of this?

No, this is a major shortcoming. :-( I would like to go over the
queryable state client for the next release and make sure to get some
of these annoyances out of the way, like using the job name instead of
the JobID or requiring another query name for the job if you want to
make it queryable via name.

[1] 
http://www.scala-lang.org/api/current/scala/concurrent/Future$.html#sequence[A,M[X]<:TraversableOnce[X]](in:M[scala.concurrent.Future[A]])(implicitcbf:scala.collection.generic.CanBuildFrom[M[scala.concurrent.Future[A]],A,M[A]],implicitexecutor:scala.concurrent.ExecutionContext):scala.concurrent.Future[M[A]]


Re: Batch stream Sink delay ?

2017-03-16 Thread Fabian Hueske
Hi Paul,

since each operator uses the minimum watermark of all its inputs, you must
ensure that each parallel task is producing data.
If a source does not produce data, it will not increase the timestamps of
its watermarks.

Another challenge, that you might run into is that you need to make sure
that the file (or file chunks if it is split for parallel reading) is read
in the increasing timestamp order.
Otherwise, watermarks will be emitted too early. I

If I got understood your use case correctly, you are just experimenting
with file input to get a feeling for the API.
I would try to set the parallelism of the file source to 1 to ensure that
the data is read in the same order and that all tasks are producing data.

Hope this helps,
Fabian

2017-03-15 23:54 GMT+01:00 Paul Smith :

> Thanks Fabian, I’m pretty sure you are correct here.  I can see in the
> Metric view that the currentLowWaterMark is set to MIN_VALUE by the looks
> of it, so Watermarks are not being emitted at all until the end.  This
> stays all the way through the job.
>
>
>
> I’m not sure why this is the case.  I’ve verified that my
> TimestampExtractor class is being called, and returning the value I’d
> expect (The timestamp from the log line), and looks legitimate.  My
> WindowFunction which is doing the aggregation is not being called until
> right at the end of the job, but yet the windows comes out ok in the
> destination.
>
>
>
> I am not sure how to debug this one.  Do you or anyone have any other
> suggestions on how to debug why the Windowing is not being triggered?  I
> can’t glean any useful further ideas from the metrics I can see..
>
>
>
> Appreciate the help
>
>
>
> Cheers,
>
>
> Paul
>
>
>
> *From: *Fabian Hueske 
> *Reply-To: *"user@flink.apache.org" 
> *Date: *Tuesday, 14 March 2017 at 7:59 pm
> *To: *"user@flink.apache.org" 
> *Subject: *Re: Batch stream Sink delay ?
>
>
>
> Hi Paul,
>
> This might be an issue with the watermarks. A window operation can only be
> compute and emit its results when the watermark time is later than the end
> time of the window.
>
> Each operator keeps track of the maximum timestamp of all its input tasks
> and computes its own time as the minimum of all those maximum timestamps.
>
> If one (or all) watermarks received by the window operator is not later
> than the end time of the window, the window will not be computed.
>
> When an file input is completely processed, Flink sends a Long.MAX_VALUE
> timestamp which might trigger the execution at the end of the job.
>
> I would try to debug the watermarks of your job. The web dashboard
> provides a few metrics for that.
>
> Best, Fabian
>
>
>
> 2017-03-14 2:47 GMT+01:00 Paul Smith :
>
> Using Flink 1.2.
>
>
>
> Hi all, I have a question about Batch processing and Sinks.  I have a
> Flink job that parses a User Request log file that contains performance
> data per request. It accumulates metric data values into 15 minute time
> windows.  Each log line is mapped to multiple records, so that each of the
> metric value can be 'billed' against a few different categories (User,
> Organisation, Project, Action).  The goal of the Flink job is to distil the
> volume of request data into more manageable summaries.  We can then see
> breakdowns of, say, User CPU utilised by distinct categories (e.g. Users)
> and allow us to look for trends/outliers.
>
>
>
> It is working very well as a batch, but with one unexpected behaviour.
>
>
>
> What I can't understand is that the Sink does not appear to get _any_
> records until the rest of the chain has completed over the entire file.
> I've played around with parallelism (1->3), and also verified with logging
> that the Sink isn't seeing any data until the entire previous chain is
> complete.  Is this expected behaviour? I was thinking that as each Time
> Window passed a 'block' of the results would be emitted to the sink.  Since
> we use Event Time characteristics, the batch job ought to emit these chunks
> as each 15 minute segment passes?
>
>
>
> You can see the base job here, with an example of a single log line with
> the metrics here:
>
>
>
> https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846d2f152
>
>
>
> Each Category I've called an 'identifierType' (User, Organisation..), with
> the 'identifier' the value of that (a UserId for example).  I key the
> stream by this pair of records and then Fold the records by the Time window
> summing each metric type's value up.  I have yet to work out the proper use
> case for Fold versus Reduce, I may have got that wrong, but I can't see how
> that changes the flow here.
>
>
>
> The output is a beautiful rolled up summary by 15 minutes by each
> identifierType & identifier.  I have yet to attempt a live Streaming
> version of this, but had thought the Batch version would also be concurrent
> and start emitting 15 minute windows as soon as the stream chunks
> transitions into the next window.   Given the entire job takes about 5
> minutes on my laptop for 

Questions regarding queryable state

2017-03-16 Thread Kathleen Sharp
Hi,

I have some questions regarding the Queryable State feature:

Is it possible to use the QueryClient to get a list of keys for a given State?
At the moment it is not possible to use ListState - will this ever be
introduced?

My first impression is that I would need one of these 2 to be able to
use Queryable state.

I would then probably need to build only top of the queryable state
client to allow filtering, pagination etc of results. Is the intention
to enrich the client at some point with this (assuming list state
becomes supported)?

The queryable state client needs a job id, is there any recommended
way of getting ahold of this?

Any pointers to good slide decks or documentation welcome :)

Kat


Re: Data+control stream from kafka + window function - not working

2017-03-16 Thread Tzu-Li (Gordon) Tai
Hi Tarandeep,

I haven’t looked at the rest of the code yet, but my first guess is that you 
might not be reading any data from Kafka at all:

private static DataStream readKafkaStream(String topic, 
StreamExecutionEnvironment env) throws IOException {

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "group-0009");
properties.setProperty("auto.offset.reset", "smallest");
return env.addSource(new FlinkKafkaConsumer08<>(topic, new 
SimpleStringSchema(), properties));
}

Have you tried using a different “group.id” everytime you’re re-running the job?
Note that the “auto.offset.reset” value is only respected when there aren’t any 
offsets for the group committed in Kafka.
So you might not actually be reading the complete “small_input.cv” dataset, 
unless you use a different group.id overtime.

Cheers,
Gordon

On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarand...@gmail.com) wrote:

Hi,

I am using flink-1.2 and reading data stream from Kafka (using 
FlinkKafkaConsumer08). I want to connect this data stream with another stream 
(read control stream) so as to do some filtering on the fly. After filtering, I 
am applying window function (tumbling/sliding event window) along with fold 
function. However, the window function does not get called.

Any help to debug/fix this is greatly appreciated!

Below is a reproducible code that one can run in IDE like IntelliJ or on flink 
cluster. You will need to have a running Kafka cluster (local or otherwise).
Create a topic and add test data points-

$KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper 
localhost:2181 --replication-factor 1 --partitions 1
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
test < small_input.csv

where small_input.csv contains the following lines-

p1,10.0f,2017-03-14 16:01:01
p1,10.0f,2017-03-14 16:01:02
p1,10.0f,2017-03-14 16:01:03
p1,10.0f,2017-03-14 16:01:04
p1,10.0f,2017-03-14 16:01:05
p1,10.0f,2017-03-14 16:01:10
p1,10.0f,2017-03-14 16:01:11
p1,10.0f,2017-03-14 16:01:12
p1,10.0f,2017-03-14 16:01:40
p1,10.0f,2017-03-14 16:01:50

Now you can run the code given below. Note:

1) In this example, I am not reading control stream from Kafka (but issue can 
be reproduced with this code as well)
2) If instead of reading data stream from kafka, I create stream from elements 
(i.e. use getInput function instead of getKafkaInput function), the code works 
and window function is fired.

Thanks,
Tarandeep



import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;

public class Test3 {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//DataStream product = getInput(env);
DataStream product = getKafkaInput(env);
DataStream> control= getControl(env);

DataStream filteredStream = product.keyBy(0)
.connect(control.keyBy(0))
.flatMap(new CoFlatMapFunImpl());

DataStream watermarkedStream = 
filteredStream.assignTimestampsAndWatermarks(
getTimestampAssigner(Time.seconds(1))).setParallelism(3);

watermarkedStream.transform("WatermarkDebugger", 
watermarkedStream.getType(), new Water

Re: Checkpointing with RocksDB as statebackend

2017-03-16 Thread vinay patil
Hi Stephan,

What can be the workaround for this ?

Also need one confirmation : Is G1 GC used by default when running the
pipeline on YARN. (I see a thread of 2015 where G1 is used by default for
JAVA8)



Regards,
Vinay Patil

On Wed, Mar 15, 2017 at 10:32 PM, Stephan Ewen [via Apache Flink User
Mailing List archive.]  wrote:

> Hi Vinay!
>
> Savepoints also call the same problematic RocksDB function, unfortunately.
>
> We will have a fix next month. We either (1) get a patched RocksDB version
> or we (2) implement a different pattern for ListState in Flink.
>
> (1) would be the better solution, so we are waiting for a response from
> the RocksDB folks. (2) is always possible if we cannot get a fix from
> RocksDB.
>
> Stephan
>
>
> On Wed, Mar 15, 2017 at 5:53 PM, vinay patil <[hidden email]
> > wrote:
>
>> Hi Stephan,
>>
>> Thank you for making me aware of this.
>>
>> Yes I am using a window without reduce function (Apply function). The
>> discussion happening on JIRA is exactly what I am observing, consistent
>> failure of checkpoints after some time and the stream halts.
>>
>> We want to go live in next month, not sure how this will affect in
>> production as we are going to get above 200 million data.
>>
>> As a workaround can I take the savepoint while the pipeline is running ?
>> Let's say if I take savepoint after every 30minutes, will it work ?
>>
>>
>>
>> Regards,
>> Vinay Patil
>>
>> On Tue, Mar 14, 2017 at 10:02 PM, Stephan Ewen [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> > wrote:
>>
>>> The issue in Flink is https://issues.apache.org/jira/browse/FLINK-5756
>>>
>>> On Tue, Mar 14, 2017 at 3:40 PM, Stefan Richter <[hidden email]
>>> > wrote:
>>>
 Hi Vinay,

 I think the issue is tracked here: https://github.com/faceb
 ook/rocksdb/issues/1988.

 Best,
 Stefan

 Am 14.03.2017 um 15:31 schrieb Vishnu Viswanath <[hidden email]
 >:

 Hi Stephan,

 Is there a ticket number/link to track this, My job has all the
 conditions you mentioned.

 Thanks,
 Vishnu

 On Tue, Mar 14, 2017 at 7:13 AM, Stephan Ewen <[hidden email]
 > wrote:

> Hi Vinay!
>
> We just discovered a bug in RocksDB. The bug affects windows without
> reduce() or fold(), windows with evictors, and ListState.
>
> A certain access pattern in RocksDB starts being so slow after a
> certain size-per-key that it basically brings down the streaming program
> and the snapshots.
>
> We are reaching out to the RocksDB folks and looking for workarounds
> in Flink.
>
> Greetings,
> Stephan
>
>
> On Wed, Mar 1, 2017 at 12:10 PM, Stephan Ewen <[hidden email]
> > wrote:
>
>> @vinay  Can you try to not set the buffer timeout at all? I am
>> actually not sure what would be the effect of setting it to a negative
>> value, that can be a cause of problems...
>>
>>
>> On Mon, Feb 27, 2017 at 7:44 PM, Seth Wiesman <[hidden email]
>> > wrote:
>>
>>> Vinay,
>>>
>>>
>>>
>>> The bucketing sink performs rename operations during the checkpoint
>>> and if it tries to rename a file that is not yet consistent that would
>>> cause a FileNotFound exception which would fail the checkpoint.
>>>
>>>
>>>
>>> Stephan,
>>>
>>>
>>>
>>> Currently my aws fork contains some very specific assumptions about
>>> the pipeline that will in general only hold for my pipeline. This is
>>> because there were still some open questions that  I had about how to 
>>> solve
>>> consistency issues in the general case. I will comment on the Jira issue
>>> with more specific.
>>>
>>>
>>>
>>> Seth Wiesman
>>>
>>>
>>>
>>> *From: *vinay patil <[hidden email]
>>> >
>>> *Reply-To: *"[hidden email]
>>> " <[hidden
>>> email] >
>>> *Date: *Monday, February 27, 2017 at 1:05 PM
>>> *To: *"[hidden email]
>>> " <[hidden
>>> email] >
>>>
>>>
>>> *Subject: *Re: Checkpointing with RocksDB as statebackend
>>>
>>>
>>>
>>> Hi Seth,
>>>
>>> Thank you for your suggestion.
>>>
>>> But if the issue is only related to S3, then why does this happen
>>> when I replace the S3 sink  to HDFS as w

Re: [POLL] Who still uses Java 7 with Flink ?

2017-03-16 Thread Bowen Li
There's always a tradeoff we need to make. I'm in favor of upgrading to
Java 8 to bring in all new Java features.

The common way I've seen (and I agree) other software upgrading major
things like this is 1) upgrade for next big release without backward
compatibility and notify everyone 2) maintain and patch current, old-tech
compatible version at a reasonably limited scope. Building backward
compatibility is too much for an open sourced project



On Wed, Mar 15, 2017 at 7:10 AM, Robert Metzger  wrote:

> I've put it also on our Twitter account:
> https://twitter.com/ApacheFlink/status/842015062667755521
>
> On Wed, Mar 15, 2017 at 2:19 PM, Martin Neumann 
> wrote:
>
> > I think this easier done in a straw poll than in an email conversation.
> > I created one at: http://www.strawpoll.me/12535073
> > (Note that you have multiple choices.)
> >
> >
> > Though I prefer Java 8 most of the time I have to work on Java 7. A lot
> of
> > the infrastructure I work on still runs Java 7, one of the companies I
> > build a prototype for a while back just updated to Java 7 2 years ago. I
> > doubt we can ditch Java 7 support any time soon if we want to make it
> easy
> > for companies to use Flink.
> >
> > cheers Martin
> >
> > //PS sorry if this gets sent twice, we just migrated to a new mail system
> > and a lot of things are broken
> >
> > 
> > From: Stephan Ewen 
> > Sent: Wednesday, March 15, 2017 12:30:24 PM
> > To: user@flink.apache.org; d...@flink.apache.org
> > Subject: [POLL] Who still uses Java 7 with Flink ?
> >
> > Hi all!
> >
> > I would like to get a feeling how much Java 7 is still being used among
> > Flink users.
> >
> > At some point, it would be great to drop Java 7 support and make use of
> > Java 8's new features, but first we would need to get a feeling how much
> > Java 7 is still used.
> >
> > Would be happy if users on Java 7 respond here, or even users that have
> > some insights into how widespread they think Java 7 still is.
> >
> > Thanks,
> > Stephan
> >
> >
> >
> >
> >
>