Re: API request to submit job takes over 1hr

2016-06-13 Thread Tzu-Li (Gordon) Tai
Hi Shannon,

Thanks for your investigation on the issue and the JIRA. There's actually a
previous JIRA on this problem already:
https://issues.apache.org/jira/browse/FLINK-4023. Would you be ok with
tracking this issue on FLINK-4023, and close FLINK-4069 as a duplicate
issue? As you can see, I've also referenced a link to FLINK-4069 on
FLINK-4023 for your additional info on the problem.

A little help with answering your last questions:
1. We're doing the partition distribution across consumers ourselves: the
Kafka consumer connector creates a Kafka client on subtasks, and each
subtask independently determines which partitions it should be in charge of.
There's also information on this blog here for more info:
http://data-artisans.com/kafka-flink-a-practical-how-to/, on the last FAQ
section. As Robert has mentioned, the consumer is currently depending on the
fixed ordered list of partitions sent to all subtasks so that each of them
always determine the same set of partitions to fetch from across restarts.
2. Following the above description, currently the consumer is only
subscribing to the fixed partition list queried in the constructor. So at
the moment the Flink Kafka consumer doesn't handle repartitioning of topics,
but it's definitely on the todo list for the Kafka connector and won't be
too hard to implement once querying in the consumer is resolved (perhaps
Robert can clarify this a bit more).

Best,
Gordon



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-tp7319p7558.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: API request to submit job takes over 1hr

2016-06-13 Thread Shannon Carey
Robert,

Thanks for your thoughtful response.


  1.  I understand your concern. User code is not guaranteed to respond to 
thread interrupts. So no matter what you do, you may end up with a stuck 
thread. But I think we can improve the user experience. First, we can update 
the documentation to make it clear that the main() method will be executed 
during job submission, and that jobs should make an effort to avoid doing 
time-consuming work in that main method. Second, I still think it's in your 
best interest to decouple the job submission thread from the HTTP thread. That 
would un-hide the problem, because the end-user could see that their job 
request has been started but is not making it past a certain point (maybe it's 
in one phase/status before main() executes, and in a different status once 
main() completes). Also, it would be obvious if they have made (and failed or 
aborted) multiple job submission API requests that those requests are still 
occupying threads. Right now, it's impossible to tell what has happened to the 
request or whether it is occupying a thread without relying on log output 
(which took us a while to get right in AWS EMR YARN) or a stack dump. Ideally, 
the UI should be able to list all the threads that are currently working on job 
submission.
  2.  I see, the main method will execute on the Application Master, right? I 
created https://issues.apache.org/jira/browse/FLINK-4069 Unfortunately, I don't 
understand very well how Kafka brokers & clients cooperate to make sure that 
partitions are distributed across consumers that share a group id (is there 
documentation about that somewhere?)… Also, I'm not sure how Flink deals with 
repartitioning.

-Shannon

From: Robert Metzger mailto:rmetz...@apache.org>>
Date: Thursday, June 2, 2016 at 4:19 AM
To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: API request to submit job takes over 1hr

Hi Shannon,

thank you for further investigating the issue.
Its fine to keep the discussion on the user@ list. Most devs are on the user 
list as well and we'll probably file some JIRAs.

Regarding your suggestions:
1. Not sure if making the job submission non-blocking is a good idea. We would 
probably need to interrupt the submitting thread after a while, which does not 
always work (we made the experience that Kafka and Hadoop for example often 
ignore interrupts, or even worse gets stuck afterwards). This would just hide 
the problems or introduce new issues.

2. As you've identified correctly, the real issue here is that the Kafka 
consumer is querying the brokers for metadata from the constructor (= on the 
client) not from the workers in the cluster (in the open() method).
Changing the behavior is on my todo list. If you want, you can file a JIRA for 
this. If you have also time to work on this, you can of course also open a pull 
request. Otherwise, some contributors from the Flink community can take care of 
the implementation.
The main reason why we do the querying centrally is: a) avoid overloading the 
brokers b) send the same list of partitions (in the same order) to all parallel 
consumers to do a fixed partition assignments (also across restarts). When we 
do the querying in the open() method, we need to make sure that all partitions 
are assigned, without duplicates (also after restarts in case of failures).

Regards,
Robert




On Thu, Jun 2, 2016 at 1:44 AM, Shannon Carey 
mailto:sca...@expedia.com>> wrote:
It looks like the problem is due to the stack trace below.

Simply put, connection failure to Kafka when using the default settings causes 
job submission to take over (flink.get-partitions.retry * tries by 
SimpleConsumer * socket.timeout.ms * # of Kafka 
brokers) = (3 * 2 * 30 * (# of Kafka brokers)) seconds. In my case, since I 
have 36 Kafka brokers, it took over 108 minutes. This is beyond the maximum 
idle connection timeout of an AWS ELB of 60 minutes, and beyond the normal 
length of time most people expect an HTTP request to take. During these 108 
minutes and after, aside from examining logs & stack traces, it is not possible 
to determine what is happening with regard to the run job request. It simply 
appears to hang and then fail, typically with a 504 Gateway Timeout status.

There are a couple problems that are responsible for this situation. Let me 
know if I should move this discussion to the "devs" list: I am not a member 
there yet. I am happy to submit JIRAs and I would be able to submit a Pull 
Request for the change to FlinkKafkaConsumer08 (and 09) initialization as 
suggested below.

  1.  JarRunHandler is provided with a timeout value, but that timeout value is 
ignored when calling getJobGraphAndClassLoader(). This allows HTTP "run" 
requests to take arbitrary amounts of time during which the status of the 
request and the job is unclear. Depending on the semantics of the work that 
method does, perhaps it could be made 

Re: Gelly scatter/gather

2016-06-13 Thread Vasiliki Kalavri
Hi Alieh,

the VertexUpdateFunction and the MessagingFunction both have a method
"getSuperstepNumber()" which will give you the current iteration number.

-Vasia.

On 13 June 2016 at 18:06, Alieh Saeedi  wrote:

> Hi
> Is it possible to access iteration number in gelly scatter/gather?
>
> thanks in advance
> 
> 
>


Re: HBase reads and back pressure

2016-06-13 Thread Fabian Hueske
Do the backpressure metrics indicate that the sink function is blocking?

2016-06-13 16:58 GMT+02:00 Christophe Salperwyck <
christophe.salperw...@gmail.com>:

> To continue, I implemented the ws.apply(new SummaryStatistics(), new
> YourFoldFunction(), new YourWindowFunction());
>
> It works fine when there is no sink, but when I put an HBase sink it seems
> that the sink, somehow, blocks the flow. The sink writes very little data
> into HBase and when I limit my input to just few sensors, it works well. Any
> idea?
>
> final SingleOutputStreamOperator aggregates = ws
> .apply(
> new Aggregate(),
> new FoldFunction() {
>
> @Override
> public Aggregate fold(final Aggregate accumulator, final ANA value) throws
> Exception {
> accumulator.addValue(value.getValue());
> return accumulator;
> }
> },
> new WindowFunction() {
>
> @Override
> public void apply(final Tuple key, final TimeWindow window, final
> Iterable input,
> final Collector out) throws Exception {
> for (final Aggregate aggregate : input) {
> aggregate.setM((String) key.getField(0));
> aggregate.setTime(window.getStart());
> out.collect(aggregate);
> }
> }
> });
> aggregates.
> setParallelism(10).
> writeUsingOutputFormat(new OutputFormat() {
> private static final long serialVersionUID = 1L;
> HBaseConnect hBaseConnect;
> Table table;
> final int flushSize = 100;
> List puts = new ArrayList<>();
> @Override
> public void writeRecord(final Aggregate record) throws IOException {
> puts.add(record.buildPut());
> if (puts.size() == flushSize) {
> table.put(puts);
> }
> }
> @Override
> public void open(final int taskNumber, final int numTasks) throws
> IOException {
> hBaseConnect = new HBaseConnect();
> table = hBaseConnect.getHTable("PERF_TEST");
> }
> @Override
> public void configure(final Configuration parameters) {
> // TODO Auto-generated method stub
> }
> @Override
> public void close() throws IOException {
> //last inserts
> table.put(puts);
> table.close();
> hBaseConnect.close();
> }
> });
>
> 2016-06-13 13:47 GMT+02:00 Maximilian Michels :
>
>> Thanks!
>>
>> On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck
>>  wrote:
>> > Hi,
>> > I vote on this issue and I agree this would be nice to have.
>> >
>> > Thx!
>> > Christophe
>> >
>> > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek :
>> >>
>> >> Hi,
>> >> I'm afraid this is currently a shortcoming in the API. There is this
>> open
>> >> Jira issue to track it:
>> https://issues.apache.org/jira/browse/FLINK-3869. We
>> >> can't fix it before Flink 2.0, though, because we have to keep the API
>> >> stable on the Flink 1.x release line.
>> >>
>> >> Cheers,
>> >> Aljoscha
>> >>
>> >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck
>> >>  wrote:
>> >>>
>> >>> Thanks for the feedback and sorry that I can't try all this straight
>> >>> away.
>> >>>
>> >>> Is there a way to have a different function than:
>> >>> WindowFunction> TimeWindow>()
>> >>>
>> >>> I would like to return a HBase Put and not a SummaryStatistics. So
>> >>> something like this:
>> >>> WindowFunction()
>> >>>
>> >>> Christophe
>> >>>
>> >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske :
>> 
>>  OK, this indicates that the operator following the source is a
>>  bottleneck.
>> 
>>  If that's the WindowOperator, it makes sense to try the refactoring
>> of
>>  the WindowFunction.
>>  Alternatively, you can try to run that operator with a higher
>>  parallelism.
>> 
>>  2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>>  :
>> >
>> > Hi Fabian,
>> >
>> > Thanks for the help, I will try that. The backpressure was on the
>> > source (HBase).
>> >
>> > Christophe
>> >
>> > 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
>> >>
>> >> Hi Christophe,
>> >>
>> >> where does the backpressure appear? In front of the sink operator
>> or
>> >> before the window operator?
>> >>
>> >> In any case, I think you can improve your WindowFunction if you
>> >> convert parts of it into a FoldFunction.
>> >> The FoldFunction would take care of the statistics computation and
>> the
>> >> WindowFunction would only assemble the result record including
>> extracting
>> >> the start time of the window.
>> >>
>> >> Then you could do:
>> >>
>> >> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>> >> YourWindowFunction());
>> >>
>> >> This is more efficient because the FoldFunction is eagerly applied
>> >> when ever a new element is added to a window. Hence, the window
>> does only
>> >> hold a single value (SummaryStatistics) instead of all element
>> added to the
>> >> window. In contrast the WindowFunction is called when the window
>> is finally
>> >> evaluated.
>> >>
>> >> Hope this helps,
>> >> Fabian
>> >>
>> >> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
>> >> :
>> >>>
>> >>> Hi,
>> >>>
>> >>> I am writing a program to read timeseries from

Re: Strange behavior of DataStream.countWindow

2016-06-13 Thread Fabian Hueske
If I understood you correctly, you want to compute windows in parallel
without using a key.
Are you aware that the results of such a computation is not deterministic
and kind of arbitrary?

If that is still OK for you, you can use a mapper to assign the current
parallel index as a key field, i.e., wrap the data in a Tuple2 and then do a keyBy(0). This will keep the data local. The mapper
should extend RichMapFunction. You can access the parallel index via
getRuntimeContext().getParallelSubTaskId().

Hope this helps.
Cheers, Fabian

2016-06-11 11:53 GMT+02:00 Yukun Guo :

> Thx, now I use element.hashCode() % nPartitions and it works as expected.
>
> But I'm afraid it's not a best practice for just turning a plain (already
> paralellized) DataStream into a KeyedStream? Because it introduces some
> overhead due to physical repartitioning by key, which is unnecessary since
> I don't really care about keys.
>
> On 9 June 2016 at 22:00, Fabian Hueske  wrote:
>
>> Hi Yukun,
>>
>> the problem is that the KeySelector is internally invoked multiple times.
>> Hence it must be deterministic, i.e., it must extract the same key for
>> the same object if invoked multiple times.
>> The documentation is not discussing this aspect and should be extended.
>>
>> Thanks for pointing out this issue.
>>
>> Cheers,
>> Fabian
>>
>>
>> 2016-06-09 13:19 GMT+02:00 Yukun Guo :
>>
>>> I’m playing with the (Window)WordCount example from Flink QuickStart. I
>>> generate a DataStream consisting of 1000 Strings of random digits,
>>> which is windowed with a tumbling count window of 50 elements:
>>>
>>> import org.apache.flink.api.common.functions.FlatMapFunction;import 
>>> org.apache.flink.api.java.functions.KeySelector;import 
>>> org.apache.flink.api.java.tuple.Tuple2;import 
>>> org.apache.flink.streaming.api.datastream.DataStream;import 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
>>>  org.apache.flink.util.Collector;
>>> import java.util.Random;
>>> public class DigitCount {
>>>
>>>
>>> public static void main(String[] args) throws Exception {
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> DataStream text = env.fromElements(
>>> "14159265358979323846264338327950288419716939937510",
>>> "58209749445923078164062862089986280348253421170679",
>>> "82148086513282306647093844609550582231725359408128",
>>> "48111745028410270193852110555964462294895493038196",
>>> "44288109756659334461284756482337867831652712019091",
>>> "45648566923460348610454326648213393607260249141273",
>>> "72458700660631558817488152092096282925409171536436",
>>> "78925903600113305305488204665213841469519415116094",
>>> "33057270365759591953092186117381932611793105118548",
>>> "07446237996274956735188575272489122793818301194912",
>>> "98336733624406566430860213949463952247371907021798",
>>> "60943702770539217176293176752384674818467669405132",
>>> "00056812714526356082778577134275778960917363717872",
>>> "14684409012249534301465495853710507922796892589235",
>>> "42019956112129021960864034418159813629774771309960",
>>> "5187072113499837297804995105973173281609631859",
>>> "50244594553469083026425223082533446850352619311881",
>>> "71010003137838752886587533208381420617177669147303",
>>> "59825349042875546873115956286388235378759375195778",
>>> "18577805321712268066130019278766111959092164201989"
>>> );
>>>
>>> DataStream> digitCount = text
>>> .flatMap(new Splitter())
>>> .keyBy(new KeySelector, Integer>() 
>>> {
>>> @Override
>>> public Integer getKey(Tuple2 x) 
>>> throws Exception {
>>> return x.f0 % 2;
>>> }
>>> })
>>> .countWindow(50)
>>> .sum(1);
>>>
>>> digitCount.print();
>>> env.execute();
>>>
>>> }
>>>
>>> public static final class Splitter implements FlatMapFunction>> Tuple2> {
>>> @Override
>>> public void flatMap(String value, Collector>> Integer>> out) {
>>> for (String token : value.split("")) {
>>> if (token.length() == 0) {
>>> continue;
>>> }
>>> out.collect(Tuple2.of(Integer.parseInt(token), 1));
>>> }
>>> }
>>> }
>>> }
>>>
>>> The code above will produce 19 lines of output which is reasonable as
>>> the 1000 digits will be keyed into 2 partitions where one partition
>>> contains 500+ elements and the other contains slightly fewer than 500
>>> elements, therefore as a result one 50-digit window is i

Gelly scatter/gather

2016-06-13 Thread Alieh Saeedi
HiIs it possible to access iteration number in gelly scatter/gather?
thanks in advance

Re: Accessing StateBackend snapshots outside of Flink

2016-06-13 Thread Maximilian Michels
+1 to what Aljoscha said. We should rather fix this programmatically.

On Mon, Jun 13, 2016 at 4:25 PM, Aljoscha Krettek  wrote:
> Hi Josh,
> I think RocksDB does not allow accessing a data base instance from more than
> one process concurrently. Even if it were possible I would highly recommend
> not to fiddle with Flink state internals (in RocksDB or elsewhere) from the
> outside. All kinds of things might be going on at any given moment, such as:
> locking of state due to checkpoint, state restore after failure and simple
> state access.
>
> If you are interested in this we can work together on adding proper support
> for TTL (time-to-live) to the Flink state abstraction.
>
> Cheers,
> Aljoscha
>
> On Mon, 13 Jun 2016 at 12:21 Maximilian Michels  wrote:
>>
>> Hi Josh,
>>
>> I'm not a RocksDB expert but the workaround you described should work.
>> Just bear in mind that accessing RocksDB concurrently with a Flink job
>> can result in an inconsistent state. Make sure to perform atomic
>> updates and clear the RocksDB cache for the item.
>>
>> Cheers,
>> Max
>>
>> On Mon, Jun 13, 2016 at 10:14 AM, Josh  wrote:
>> > Hello,
>> > I have a follow-up question to this: since Flink doesn't support state
>> > expiration at the moment (e.g. expiring state which hasn't been updated
>> > for
>> > a certain amount of time), would it be possible to clear up old UDF
>> > states
>> > by:
>> > - store a 'last_updated" timestamp in the state value
>> > - periodically (e.g. monthly) go through all the state values in
>> > RocksDB,
>> > deserialize them using TypeSerializer and read the "last_updated"
>> > property
>> > - delete the key from RocksDB if the state's "last_updated" property is
>> > over
>> > a month ago
>> >
>> > Is there any reason this approach wouldn't work, or anything to be
>> > careful
>> > of?
>> >
>> > Thanks,
>> > Josh
>> >
>> >
>> > On Mon, Apr 18, 2016 at 8:23 AM, Aljoscha Krettek 
>> > wrote:
>> >>
>> >> Hi,
>> >> key refers to the key extracted by your KeySelector. Right now, for
>> >> every
>> >> named state (i.e. the name in the StateDescriptor) there is a an
>> >> isolated
>> >> RocksDB instance.
>> >>
>> >> Cheers,
>> >> Aljoscha
>> >>
>> >> On Sat, 16 Apr 2016 at 15:43 Igor Berman  wrote:
>> >>>
>> >>> thanks a lot for the info, seems not too complex
>> >>> I'll try to write simple tool to read this state.
>> >>>
>> >>> Aljoscha, does the key reflects unique id of operator in some way? Or
>> >>> key
>> >>> is just a "name" that passed to ValueStateDescriptor.
>> >>>
>> >>> thanks in advance
>> >>>
>> >>>
>> >>> On 15 April 2016 at 15:10, Stephan Ewen  wrote:
>> 
>>  One thing to add is that you can always trigger a persistent
>>  checkpoint
>>  via the "savepoints" feature:
>> 
>>  https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
>> 
>> 
>> 
>>  On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek
>>  
>>  wrote:
>> >
>> > Hi,
>> > for RocksDB we simply use a TypeSerializer to serialize the key and
>> > value to a byte[] array and store that in RocksDB. For a ListState,
>> > we
>> > serialize the individual elements using a TypeSerializer and store
>> > them in a
>> > comma-separated list in RocksDB. The snapshots of RocksDB that we
>> > write to
>> > HDFS are regular backups of a RocksDB database, as described here:
>> > https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F.
>> > You
>> > should be possible to read them from HDFS and restore them to a
>> > RocksDB data
>> > base as described in the linked documentation.
>> >
>> > tl;dr As long as you know the type of values stored in the state you
>> > should be able to read them from RocksDB and deserialize the values
>> > using
>> > TypeSerializer.
>> >
>> > One more bit of information: Internally the state is keyed by (key,
>> > namespace) -> value where namespace can be an arbitrary type that
>> > has a
>> > TypeSerializer. We use this to store window state that is both local
>> > to key
>> > and the current window. For state that you store in a user-defined
>> > function
>> > the namespace will always be null and that will be serialized by a
>> > VoidSerializer that simply always writes a "0" byte.
>> >
>> > Cheers,
>> > Aljoscha
>> >
>> > On Fri, 15 Apr 2016 at 00:18 igor.berman 
>> > wrote:
>> >>
>> >> Hi,
>> >> we are evaluating Flink for new solution and several people raised
>> >> concern
>> >> of coupling too much to Flink -
>> >> 1. we understand that if we want to get full fault tolerance and
>> >> best
>> >> performance we'll need to use Flink managed state(probably RocksDB
>> >> backend
>> >> due to volume of state)
>> >> 2. but then if we latter find that Flink doesn't answer our
>> >> needs(for
>> >> any
>> >> reason) - we'll need 

Re: HBase reads and back pressure

2016-06-13 Thread Christophe Salperwyck
To continue, I implemented the ws.apply(new SummaryStatistics(), new
YourFoldFunction(), new YourWindowFunction());

It works fine when there is no sink, but when I put an HBase sink it seems
that the sink, somehow, blocks the flow. The sink writes very little data
into HBase and when I limit my input to just few sensors, it works well. Any
idea?

final SingleOutputStreamOperator aggregates = ws
.apply(
new Aggregate(),
new FoldFunction() {

@Override
public Aggregate fold(final Aggregate accumulator, final ANA value) throws
Exception {
accumulator.addValue(value.getValue());
return accumulator;
}
},
new WindowFunction() {

@Override
public void apply(final Tuple key, final TimeWindow window, final
Iterable input,
final Collector out) throws Exception {
for (final Aggregate aggregate : input) {
aggregate.setM((String) key.getField(0));
aggregate.setTime(window.getStart());
out.collect(aggregate);
}
}
});
aggregates.
setParallelism(10).
writeUsingOutputFormat(new OutputFormat() {
private static final long serialVersionUID = 1L;
HBaseConnect hBaseConnect;
Table table;
final int flushSize = 100;
List puts = new ArrayList<>();
@Override
public void writeRecord(final Aggregate record) throws IOException {
puts.add(record.buildPut());
if (puts.size() == flushSize) {
table.put(puts);
}
}
@Override
public void open(final int taskNumber, final int numTasks) throws
IOException {
hBaseConnect = new HBaseConnect();
table = hBaseConnect.getHTable("PERF_TEST");
}
@Override
public void configure(final Configuration parameters) {
// TODO Auto-generated method stub
}
@Override
public void close() throws IOException {
//last inserts
table.put(puts);
table.close();
hBaseConnect.close();
}
});

2016-06-13 13:47 GMT+02:00 Maximilian Michels :

> Thanks!
>
> On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck
>  wrote:
> > Hi,
> > I vote on this issue and I agree this would be nice to have.
> >
> > Thx!
> > Christophe
> >
> > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek :
> >>
> >> Hi,
> >> I'm afraid this is currently a shortcoming in the API. There is this
> open
> >> Jira issue to track it:
> https://issues.apache.org/jira/browse/FLINK-3869. We
> >> can't fix it before Flink 2.0, though, because we have to keep the API
> >> stable on the Flink 1.x release line.
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck
> >>  wrote:
> >>>
> >>> Thanks for the feedback and sorry that I can't try all this straight
> >>> away.
> >>>
> >>> Is there a way to have a different function than:
> >>> WindowFunction TimeWindow>()
> >>>
> >>> I would like to return a HBase Put and not a SummaryStatistics. So
> >>> something like this:
> >>> WindowFunction()
> >>>
> >>> Christophe
> >>>
> >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske :
> 
>  OK, this indicates that the operator following the source is a
>  bottleneck.
> 
>  If that's the WindowOperator, it makes sense to try the refactoring of
>  the WindowFunction.
>  Alternatively, you can try to run that operator with a higher
>  parallelism.
> 
>  2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>  :
> >
> > Hi Fabian,
> >
> > Thanks for the help, I will try that. The backpressure was on the
> > source (HBase).
> >
> > Christophe
> >
> > 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
> >>
> >> Hi Christophe,
> >>
> >> where does the backpressure appear? In front of the sink operator or
> >> before the window operator?
> >>
> >> In any case, I think you can improve your WindowFunction if you
> >> convert parts of it into a FoldFunction.
> >> The FoldFunction would take care of the statistics computation and
> the
> >> WindowFunction would only assemble the result record including
> extracting
> >> the start time of the window.
> >>
> >> Then you could do:
> >>
> >> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
> >> YourWindowFunction());
> >>
> >> This is more efficient because the FoldFunction is eagerly applied
> >> when ever a new element is added to a window. Hence, the window
> does only
> >> hold a single value (SummaryStatistics) instead of all element
> added to the
> >> window. In contrast the WindowFunction is called when the window is
> finally
> >> evaluated.
> >>
> >> Hope this helps,
> >> Fabian
> >>
> >> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
> >> :
> >>>
> >>> Hi,
> >>>
> >>> I am writing a program to read timeseries from HBase and do some
> >>> daily aggregations (Flink streaming). For now I am just computing
> some
> >>> average so not very consuming but my HBase read get slower and
> slower (I
> >>> have few billions of points to read). The back pressure is almost
> all the
> >>> time close to 1.
> >>>
> >>> I use custom timestamp:
> >>> env.setStreamTimeCharacteristic(TimeChara

Custom Barrier?

2016-06-13 Thread Paul Wilson
Hi,

I've been evaluating Flink and wondering if it was possible to define a
window that is based on characteristics of the data (data driven) but not
contained in the data stream directly.

Consider 'nested events' where lower level events belong to a wider event
where the wider event serves only to define a boundary (or window) over the
lower level events. I was wondering if there was some way to communicate
this super-structure in the stream somehow?

I know that Flink users 'barriers' to define snapshot boundaries, but it
might it be possible to communicate a 'window end' in a similar fashion?

I guess I could attach an additional value to each event using a stateful
map function and then define the window on that?

e.g. A-Start, 1, 2, 3, A-End, B-Start, 1, 2, 3, B-End

Regards,
Paul


Re: Accessing StateBackend snapshots outside of Flink

2016-06-13 Thread Aljoscha Krettek
Hi Josh,
I think RocksDB does not allow accessing a data base instance from more
than one process concurrently. Even if it were possible I would highly
recommend not to fiddle with Flink state internals (in RocksDB or
elsewhere) from the outside. All kinds of things might be going on at any
given moment, such as: locking of state due to checkpoint, state restore
after failure and simple state access.

If you are interested in this we can work together on adding proper support
for TTL (time-to-live) to the Flink state abstraction.

Cheers,
Aljoscha

On Mon, 13 Jun 2016 at 12:21 Maximilian Michels  wrote:

> Hi Josh,
>
> I'm not a RocksDB expert but the workaround you described should work.
> Just bear in mind that accessing RocksDB concurrently with a Flink job
> can result in an inconsistent state. Make sure to perform atomic
> updates and clear the RocksDB cache for the item.
>
> Cheers,
> Max
>
> On Mon, Jun 13, 2016 at 10:14 AM, Josh  wrote:
> > Hello,
> > I have a follow-up question to this: since Flink doesn't support state
> > expiration at the moment (e.g. expiring state which hasn't been updated
> for
> > a certain amount of time), would it be possible to clear up old UDF
> states
> > by:
> > - store a 'last_updated" timestamp in the state value
> > - periodically (e.g. monthly) go through all the state values in RocksDB,
> > deserialize them using TypeSerializer and read the "last_updated"
> property
> > - delete the key from RocksDB if the state's "last_updated" property is
> over
> > a month ago
> >
> > Is there any reason this approach wouldn't work, or anything to be
> careful
> > of?
> >
> > Thanks,
> > Josh
> >
> >
> > On Mon, Apr 18, 2016 at 8:23 AM, Aljoscha Krettek 
> > wrote:
> >>
> >> Hi,
> >> key refers to the key extracted by your KeySelector. Right now, for
> every
> >> named state (i.e. the name in the StateDescriptor) there is a an
> isolated
> >> RocksDB instance.
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Sat, 16 Apr 2016 at 15:43 Igor Berman  wrote:
> >>>
> >>> thanks a lot for the info, seems not too complex
> >>> I'll try to write simple tool to read this state.
> >>>
> >>> Aljoscha, does the key reflects unique id of operator in some way? Or
> key
> >>> is just a "name" that passed to ValueStateDescriptor.
> >>>
> >>> thanks in advance
> >>>
> >>>
> >>> On 15 April 2016 at 15:10, Stephan Ewen  wrote:
> 
>  One thing to add is that you can always trigger a persistent
> checkpoint
>  via the "savepoints" feature:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
> 
> 
> 
>  On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek <
> aljos...@apache.org>
>  wrote:
> >
> > Hi,
> > for RocksDB we simply use a TypeSerializer to serialize the key and
> > value to a byte[] array and store that in RocksDB. For a ListState,
> we
> > serialize the individual elements using a TypeSerializer and store
> them in a
> > comma-separated list in RocksDB. The snapshots of RocksDB that we
> write to
> > HDFS are regular backups of a RocksDB database, as described here:
> > https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F.
> You
> > should be possible to read them from HDFS and restore them to a
> RocksDB data
> > base as described in the linked documentation.
> >
> > tl;dr As long as you know the type of values stored in the state you
> > should be able to read them from RocksDB and deserialize the values
> using
> > TypeSerializer.
> >
> > One more bit of information: Internally the state is keyed by (key,
> > namespace) -> value where namespace can be an arbitrary type that
> has a
> > TypeSerializer. We use this to store window state that is both local
> to key
> > and the current window. For state that you store in a user-defined
> function
> > the namespace will always be null and that will be serialized by a
> > VoidSerializer that simply always writes a "0" byte.
> >
> > Cheers,
> > Aljoscha
> >
> > On Fri, 15 Apr 2016 at 00:18 igor.berman 
> wrote:
> >>
> >> Hi,
> >> we are evaluating Flink for new solution and several people raised
> >> concern
> >> of coupling too much to Flink -
> >> 1. we understand that if we want to get full fault tolerance and
> best
> >> performance we'll need to use Flink managed state(probably RocksDB
> >> backend
> >> due to volume of state)
> >> 2. but then if we latter find that Flink doesn't answer our
> needs(for
> >> any
> >> reason) - we'll need to extract this state in some way(since it's
> the
> >> only
> >> source of consistent state)
> >> In general I'd like to be able to take snapshot of backend and try
> to
> >> read
> >> it...do you think it's will be trivial task?
> >> say If I'm holding list state per partitioned key, would it be easy
> to
> >> take
> >> RocksDb file and

[jira] Akshay Shingote shared "FLINK-4059: Requirement of Streaming layer for Complex Event Processing" with you

2016-06-13 Thread Akshay Shingote (JIRA)
Akshay Shingote shared an issue with you




> Requirement of Streaming layer for Complex Event Processing
> ---
>
> Key: FLINK-4059
> URL: https://issues.apache.org/jira/browse/FLINK-4059
> Project: Flink
>  Issue Type: Test
>  Components: CEP
>Affects Versions: 1.0.1
>Reporter: Akshay Shingote
>
> I am trying to get this Flink CEP Example : 
> https://github.com/tillrohrmann/cep-monitoring
> One question which arises here is that they have made one single application 
> which does the work of Pattern Matching or CEP. (Like ingest & process is 
> done by 1 application)...They didn't use any streaming layer in between like 
> Kafka or any...I just want to know why any streaming layer is not use in 
> between the producer & the consumer?? Also,I want to know what will be the 
> advantages and disadvantages if I put a streaming layer in between ?? Thank 
> You

 Also shared with
  iss...@flink.apache.org



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: HBase reads and back pressure

2016-06-13 Thread Maximilian Michels
Thanks!

On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck
 wrote:
> Hi,
> I vote on this issue and I agree this would be nice to have.
>
> Thx!
> Christophe
>
> 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek :
>>
>> Hi,
>> I'm afraid this is currently a shortcoming in the API. There is this open
>> Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869. We
>> can't fix it before Flink 2.0, though, because we have to keep the API
>> stable on the Flink 1.x release line.
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck
>>  wrote:
>>>
>>> Thanks for the feedback and sorry that I can't try all this straight
>>> away.
>>>
>>> Is there a way to have a different function than:
>>> WindowFunction()
>>>
>>> I would like to return a HBase Put and not a SummaryStatistics. So
>>> something like this:
>>> WindowFunction()
>>>
>>> Christophe
>>>
>>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske :

 OK, this indicates that the operator following the source is a
 bottleneck.

 If that's the WindowOperator, it makes sense to try the refactoring of
 the WindowFunction.
 Alternatively, you can try to run that operator with a higher
 parallelism.

 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
 :
>
> Hi Fabian,
>
> Thanks for the help, I will try that. The backpressure was on the
> source (HBase).
>
> Christophe
>
> 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
>>
>> Hi Christophe,
>>
>> where does the backpressure appear? In front of the sink operator or
>> before the window operator?
>>
>> In any case, I think you can improve your WindowFunction if you
>> convert parts of it into a FoldFunction.
>> The FoldFunction would take care of the statistics computation and the
>> WindowFunction would only assemble the result record including extracting
>> the start time of the window.
>>
>> Then you could do:
>>
>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>> YourWindowFunction());
>>
>> This is more efficient because the FoldFunction is eagerly applied
>> when ever a new element is added to a window. Hence, the window does only
>> hold a single value (SummaryStatistics) instead of all element added to 
>> the
>> window. In contrast the WindowFunction is called when the window is 
>> finally
>> evaluated.
>>
>> Hope this helps,
>> Fabian
>>
>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
>> :
>>>
>>> Hi,
>>>
>>> I am writing a program to read timeseries from HBase and do some
>>> daily aggregations (Flink streaming). For now I am just computing some
>>> average so not very consuming but my HBase read get slower and slower (I
>>> have few billions of points to read). The back pressure is almost all 
>>> the
>>> time close to 1.
>>>
>>> I use custom timestamp:
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>> so I implemented a custom extractor based on:
>>> AscendingTimestampExtractor
>>>
>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>>> read/s then it get worse and worse. Even when I cancel the job, data are
>>> still being written in HBase (I did a sink similar to the example - 
>>> with a
>>> cache of 100s of HBase Puts to be a bit more efficient).
>>>
>>> When I don't put a sink it seems to stay on 1M reads/s.
>>>
>>> Do you have an idea why ?
>>>
>>> Here is a bit of code if needed:
>>> final WindowedStream ws = hbaseDS.keyBy(0)
>>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
>>> .keyBy(0)
>>> .timeWindow(Time.days(1));
>>>
>>> final SingleOutputStreamOperator puts = ws.apply(new
>>> WindowFunction() {
>>>
>>> @Override
>>> public void apply(final Tuple key, final TimeWindow window, final
>>> Iterable input,
>>> final Collector out) throws Exception {
>>>
>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>>> for (final ANA ana : input) {
>>> summaryStatistics.addValue(ana.getValue());
>>> }
>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>>> summaryStatistics);
>>> out.collect(put);
>>> }
>>> });
>>>
>>> And how I started Flink on YARN :
>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>> -Dtaskmanager.network.numberOfBuffers=4096
>>>
>>> Thanks for any feedback!
>>>
>>> Christophe
>>
>>
>

>>>
>


Re: HBase reads and back pressure

2016-06-13 Thread Christophe Salperwyck
Hi,
I vote on this issue and I agree this would be nice to have.

Thx!
Christophe

2016-06-13 12:26 GMT+02:00 Aljoscha Krettek :

> Hi,
> I'm afraid this is currently a shortcoming in the API. There is this open
> Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869.
> We can't fix it before Flink 2.0, though, because we have to keep the API
> stable on the Flink 1.x release line.
>
> Cheers,
> Aljoscha
>
> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck <
> christophe.salperw...@gmail.com> wrote:
>
>> Thanks for the feedback and sorry that I can't try all this straight away.
>>
>> Is there a way to have a different function than:
>> WindowFunction()
>>
>> I would like to return a HBase Put and not a SummaryStatistics. So
>> something like this:
>> WindowFunction()
>>
>> Christophe
>>
>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske :
>>
>>> OK, this indicates that the operator following the source is a
>>> bottleneck.
>>>
>>> If that's the WindowOperator, it makes sense to try the refactoring of
>>> the WindowFunction.
>>> Alternatively, you can try to run that operator with a higher
>>> parallelism.
>>>
>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <
>>> christophe.salperw...@gmail.com>:
>>>
 Hi Fabian,

 Thanks for the help, I will try that. The backpressure was on the
 source (HBase).

 Christophe

 2016-06-09 16:38 GMT+02:00 Fabian Hueske :

> Hi Christophe,
>
> where does the backpressure appear? In front of the sink operator or
> before the window operator?
>
> In any case, I think you can improve your WindowFunction if you
> convert parts of it into a FoldFunction.
> The FoldFunction would take care of the statistics computation and the
> WindowFunction would only assemble the result record including extracting
> the start time of the window.
>
> Then you could do:
>
> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
> YourWindowFunction());
>
> This is more efficient because the FoldFunction is eagerly applied
> when ever a new element is added to a window. Hence, the window does only
> hold a single value (SummaryStatistics) instead of all element added to 
> the
> window. In contrast the WindowFunction is called when the window is 
> finally
> evaluated.
>
> Hope this helps,
> Fabian
>
> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
> christophe.salperw...@gmail.com>:
>
>> Hi,
>>
>> I am writing a program to read timeseries from HBase and do some
>> daily aggregations (Flink streaming). For now I am just computing some
>> average so not very consuming but my HBase read get slower and slower (I
>> have few billions of points to read). The back pressure is almost all the
>> time close to 1.
>>
>> I use custom timestamp:
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> so I implemented a custom extractor based on:
>> AscendingTimestampExtractor
>>
>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>> read/s then it get worse and worse. Even when I cancel the job, data are
>> still being written in HBase (I did a sink similar to the example - with 
>> a
>> cache of 100s of HBase Puts to be a bit more efficient).
>>
>> When I don't put a sink it seems to stay on 1M reads/s.
>>
>> Do you have an idea why ?
>>
>> Here is a bit of code if needed:
>> final WindowedStream ws = hbaseDS.keyBy(0)
>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
>> .keyBy(0)
>> .timeWindow(Time.days(1));
>>
>> final SingleOutputStreamOperator puts = ws.apply(new
>> WindowFunction() {
>>
>> @Override
>> public void apply(final Tuple key, final TimeWindow window, final
>> Iterable input,
>> final Collector out) throws Exception {
>>
>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>> for (final ANA ana : input) {
>> summaryStatistics.addValue(ana.getValue());
>> }
>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>> summaryStatistics);
>> out.collect(put);
>> }
>> });
>>
>> And how I started Flink on YARN :
>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>> -Dtaskmanager.network.numberOfBuffers=4096
>>
>> Thanks for any feedback!
>>
>> Christophe
>>
>
>

>>>
>>


Re: HBase reads and back pressure

2016-06-13 Thread Christophe Salperwyck
Hi Max,

In fact the Put would be the output of my WindowFunction. I saw Aljoscha
replied, seems I will need to create another intermediate class to handle
that. But it is fine.

Thx for help!
Christophe

2016-06-13 12:25 GMT+02:00 Maximilian Michels :

> Hi Christophe,
>
> A fold function has two inputs: The state and a record to update the
> state with. So you can update the SummaryStatistics (state) with each
> Put (input).
>
> Cheers,
> Max
>
> On Mon, Jun 13, 2016 at 11:04 AM, Christophe Salperwyck
>  wrote:
> > Thanks for the feedback and sorry that I can't try all this straight
> away.
> >
> > Is there a way to have a different function than:
> > WindowFunction()
> >
> > I would like to return a HBase Put and not a SummaryStatistics. So
> something
> > like this:
> > WindowFunction()
> >
> > Christophe
> >
> > 2016-06-09 17:47 GMT+02:00 Fabian Hueske :
> >>
> >> OK, this indicates that the operator following the source is a
> bottleneck.
> >>
> >> If that's the WindowOperator, it makes sense to try the refactoring of
> the
> >> WindowFunction.
> >> Alternatively, you can try to run that operator with a higher
> parallelism.
> >>
> >> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
> >> :
> >>>
> >>> Hi Fabian,
> >>>
> >>> Thanks for the help, I will try that. The backpressure was on the
> source
> >>> (HBase).
> >>>
> >>> Christophe
> >>>
> >>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
> 
>  Hi Christophe,
> 
>  where does the backpressure appear? In front of the sink operator or
>  before the window operator?
> 
>  In any case, I think you can improve your WindowFunction if you
> convert
>  parts of it into a FoldFunction.
>  The FoldFunction would take care of the statistics computation and the
>  WindowFunction would only assemble the result record including
> extracting
>  the start time of the window.
> 
>  Then you could do:
> 
>  ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>  YourWindowFunction());
> 
>  This is more efficient because the FoldFunction is eagerly applied
> when
>  ever a new element is added to a window. Hence, the window does only
> hold a
>  single value (SummaryStatistics) instead of all element added to the
> window.
>  In contrast the WindowFunction is called when the window is finally
>  evaluated.
> 
>  Hope this helps,
>  Fabian
> 
>  2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
>  :
> >
> > Hi,
> >
> > I am writing a program to read timeseries from HBase and do some
> daily
> > aggregations (Flink streaming). For now I am just computing some
> average so
> > not very consuming but my HBase read get slower and slower (I have
> few
> > billions of points to read). The back pressure is almost all the
> time close
> > to 1.
> >
> > I use custom timestamp:
> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> > so I implemented a custom extractor based on:
> > AscendingTimestampExtractor
> >
> > At the beginning I have 5M reads/s and after 15 min I have just 1M
> > read/s then it get worse and worse. Even when I cancel the job, data
> are
> > still being written in HBase (I did a sink similar to the example -
> with a
> > cache of 100s of HBase Puts to be a bit more efficient).
> >
> > When I don't put a sink it seems to stay on 1M reads/s.
> >
> > Do you have an idea why ?
> >
> > Here is a bit of code if needed:
> > final WindowedStream ws = hbaseDS.keyBy(0)
> > .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
> > .keyBy(0)
> > .timeWindow(Time.days(1));
> >
> > final SingleOutputStreamOperator puts = ws.apply(new
> > WindowFunction() {
> >
> > @Override
> > public void apply(final Tuple key, final TimeWindow window, final
> > Iterable input,
> > final Collector out) throws Exception {
> >
> > final SummaryStatistics summaryStatistics = new SummaryStatistics();
> > for (final ANA ana : input) {
> > summaryStatistics.addValue(ana.getValue());
> > }
> > final Put put = buildPut((String) key.getField(0), window.getStart(),
> > summaryStatistics);
> > out.collect(put);
> > }
> > });
> >
> > And how I started Flink on YARN :
> > flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
> > -Dtaskmanager.network.numberOfBuffers=4096
> >
> > Thanks for any feedback!
> >
> > Christophe
> 
> 
> >>>
> >>
> >
>


Re: Arrays values in keyBy

2016-06-13 Thread Ufuk Celebi
Would make sense to update the Javadocs for the next release.

On Mon, Jun 13, 2016 at 11:19 AM, Aljoscha Krettek  wrote:
> Yes, this is correct. Right now we're basically using .hashCode() for
> keying. (Which can be problematic in some cases.)
>
> Beam, for example, clearly specifies that the encoded form of a value should
> be used for all comparisons/hashing. This is more well defined but can lead
> to slow performance in some cases.
>
> On Sat, 11 Jun 2016 at 00:04 Elias Levy  wrote:
>>
>> I would be useful if the documentation warned what type of equality it
>> expected of values used as keys in keyBy.  I just got bit in the ass by
>> converting a field from a string to a byte array.  All of the sudden the
>> windows were no longer aggregating.  So it seems Flink is not doing a deep
>> compare of arrays when comparing keys.


Re: NotSerializableException

2016-06-13 Thread Aljoscha Krettek
Nope, I think there is neither a fix nor an open issue for this right now.

On Mon, 13 Jun 2016 at 11:31 Maximilian Michels  wrote:

> Is there an issue or a fix for proper use of the ClojureCleaner in
> CoGroup.where()?
>
> On Fri, Jun 10, 2016 at 8:07 AM, Aljoscha Krettek 
> wrote:
> > Hi,
> > yes, I was talking about a Flink bug. I forgot to mention the work-around
> > that Stephan mentioned.
> >
> > On Thu, 9 Jun 2016 at 20:38 Stephan Ewen  wrote:
> >>
> >> You can also make the KeySelector a static inner class. That should work
> >> as well.
> >>
> >> On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh 
> >> wrote:
> >>>
> >>> Thank you Aljoscha and Fabian for your replies.
> >>>
> >>> @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm
> >>> afraid this is a bug", I am assuming you are referring to Flink engine
> >>> itself.
> >>>
> >>> @Fabian: thanks for the optimization tip.
> >>>
> >>> This is how I have got it working (with a hack): In my dataset, the
> join
> >>> field/key can be null otherwise .where(fieldName) works and I don't get
> >>> not-serializable exception. So I applied a MapFunction to DataSet and
> put a
> >>> dummy value in the join field/key where it was null. Then In the join
> >>> function, I change it back to null.
> >>>
> >>> Best,
> >>> Tarandeep
> >>>
> >>> On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek 
> >>> wrote:
> 
>  Hi,
>  the problem is that the KeySelector is an anonymous inner class and as
>  such as a reference to the outer RecordFilterer object. Normally,
> this would
>  be rectified by the closure cleaner but the cleaner is not used in
>  CoGroup.where(). I'm afraid this is a bug.
> 
>  Best,
>  Aljoscha
> 
> 
>  On Thu, 9 Jun 2016 at 14:06 Fabian Hueske  wrote:
> >
> > Hi Tarandeep,
> >
> > the exception suggests that Flink tries to serialize RecordsFilterer
> as
> > a user function (this happens via Java Serialization).
> > I said suggests because the code that uses RecordsFilterer is not
> > included.
> >
> > To me it looks like RecordsFilterer should not be used as a user
> > function. It is a helper class to construct a DataSet program, so it
> should
> > not be shipped for execution.
> > You would use such a class as follows:
> >
> > DataSet records = ...
> > DataSet filterIDs = ...
> >
> > RecordsFilterer rf = new RecordsFilterer();
> > DataSet> result = rf.addFilterFlag(records,
> > filterIDs, "myField");
> >
> > Regarding the join code, I would suggest an optimization.
> > Instead of using CoGroup, I would use distinct and an OuterJoin like
> > this:
> >
> > DataSet distIds = filtereredIds.distinct();
> > DataSet result = records
> >   .leftOuterJoin(distIds)
> >   .where(KEYSELECTOR)
> >   .equalTo("*") // use full string as key
> >   .with(JOINFUNC) // set Bool to false if right == null, true
> otherwise
> >
> > Best, Fabian
> >
> > 2016-06-09 2:28 GMT+02:00 Tarandeep Singh :
> >>
> >> Hi,
> >>
> >> I am getting NoSerializableException in this class-
> >>
> >> 
> >>
> >> public class RecordsFilterer {
> >>
> >> public DataSet> addFilterFlag(DataSet
> >> dataset, DataSet filteredIds, String fieldName) {
> >> return dataset.coGroup(filteredIds)
> >> .where(new KeySelector() {
> >> @Override
> >> public String getKey(T t) throws Exception {
> >> String s = (String) t.get(fieldName);
> >> return s != null ? s :
> >> UUID.randomUUID().toString();
> >> }
> >> })
> >> .equalTo((KeySelector) s -> s)
> >> .with(new CoGroupFunction >> Tuple2>() {
> >> @Override
> >> public void coGroup(Iterable records,
> >> Iterable ids,
> >> Collector>
> >> collector) throws Exception {
> >> boolean filterFlag = false;
> >> for (String id : ids) {
> >> filterFlag = true;
> >> }
> >>
> >> for (T record : records) {
> >> collector.collect(new
> Tuple2<>(filterFlag,
> >> record));
> >> }
> >> }
> >> });
> >>
> >> }
> >> }
> >>
> >>
> >> What I am trying to do is write a generic code that will join Avro
> >> records (of different types) with String records and there is a
> match add a
> >> filter flag. This way I can use the same code for different Avro
> record
> >> types. But I am getting this exception-
> >>
> >>

Re: HBase reads and back pressure

2016-06-13 Thread Aljoscha Krettek
Hi,
I'm afraid this is currently a shortcoming in the API. There is this open
Jira issue to track it: https://issues.apache.org/jira/browse/FLINK-3869.
We can't fix it before Flink 2.0, though, because we have to keep the API
stable on the Flink 1.x release line.

Cheers,
Aljoscha

On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck <
christophe.salperw...@gmail.com> wrote:

> Thanks for the feedback and sorry that I can't try all this straight away.
>
> Is there a way to have a different function than:
> WindowFunction()
>
> I would like to return a HBase Put and not a SummaryStatistics. So
> something like this:
> WindowFunction()
>
> Christophe
>
> 2016-06-09 17:47 GMT+02:00 Fabian Hueske :
>
>> OK, this indicates that the operator following the source is a
>> bottleneck.
>>
>> If that's the WindowOperator, it makes sense to try the refactoring of
>> the WindowFunction.
>> Alternatively, you can try to run that operator with a higher parallelism.
>>
>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <
>> christophe.salperw...@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> Thanks for the help, I will try that. The backpressure was on the source
>>> (HBase).
>>>
>>> Christophe
>>>
>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
>>>
 Hi Christophe,

 where does the backpressure appear? In front of the sink operator or
 before the window operator?

 In any case, I think you can improve your WindowFunction if you convert
 parts of it into a FoldFunction.
 The FoldFunction would take care of the statistics computation and the
 WindowFunction would only assemble the result record including extracting
 the start time of the window.

 Then you could do:

 ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
 YourWindowFunction());

 This is more efficient because the FoldFunction is eagerly applied when
 ever a new element is added to a window. Hence, the window does only hold a
 single value (SummaryStatistics) instead of all element added to the
 window. In contrast the WindowFunction is called when the window is finally
 evaluated.

 Hope this helps,
 Fabian

 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
 christophe.salperw...@gmail.com>:

> Hi,
>
> I am writing a program to read timeseries from HBase and do some daily
> aggregations (Flink streaming). For now I am just computing some average 
> so
> not very consuming but my HBase read get slower and slower (I have few
> billions of points to read). The back pressure is almost all the time 
> close
> to 1.
>
> I use custom timestamp:
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> so I implemented a custom extractor based on:
> AscendingTimestampExtractor
>
> At the beginning I have 5M reads/s and after 15 min I have just 1M
> read/s then it get worse and worse. Even when I cancel the job, data are
> still being written in HBase (I did a sink similar to the example - with a
> cache of 100s of HBase Puts to be a bit more efficient).
>
> When I don't put a sink it seems to stay on 1M reads/s.
>
> Do you have an idea why ?
>
> Here is a bit of code if needed:
> final WindowedStream ws = hbaseDS.keyBy(0)
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
> .keyBy(0)
> .timeWindow(Time.days(1));
>
> final SingleOutputStreamOperator puts = ws.apply(new
> WindowFunction() {
>
> @Override
> public void apply(final Tuple key, final TimeWindow window, final
> Iterable input,
> final Collector out) throws Exception {
>
> final SummaryStatistics summaryStatistics = new SummaryStatistics();
> for (final ANA ana : input) {
> summaryStatistics.addValue(ana.getValue());
> }
> final Put put = buildPut((String) key.getField(0), window.getStart(),
> summaryStatistics);
> out.collect(put);
> }
> });
>
> And how I started Flink on YARN :
> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
> -Dtaskmanager.network.numberOfBuffers=4096
>
> Thanks for any feedback!
>
> Christophe
>


>>>
>>
>


Re: HBase reads and back pressure

2016-06-13 Thread Maximilian Michels
Hi Christophe,

A fold function has two inputs: The state and a record to update the
state with. So you can update the SummaryStatistics (state) with each
Put (input).

Cheers,
Max

On Mon, Jun 13, 2016 at 11:04 AM, Christophe Salperwyck
 wrote:
> Thanks for the feedback and sorry that I can't try all this straight away.
>
> Is there a way to have a different function than:
> WindowFunction()
>
> I would like to return a HBase Put and not a SummaryStatistics. So something
> like this:
> WindowFunction()
>
> Christophe
>
> 2016-06-09 17:47 GMT+02:00 Fabian Hueske :
>>
>> OK, this indicates that the operator following the source is a bottleneck.
>>
>> If that's the WindowOperator, it makes sense to try the refactoring of the
>> WindowFunction.
>> Alternatively, you can try to run that operator with a higher parallelism.
>>
>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>> :
>>>
>>> Hi Fabian,
>>>
>>> Thanks for the help, I will try that. The backpressure was on the source
>>> (HBase).
>>>
>>> Christophe
>>>
>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske :

 Hi Christophe,

 where does the backpressure appear? In front of the sink operator or
 before the window operator?

 In any case, I think you can improve your WindowFunction if you convert
 parts of it into a FoldFunction.
 The FoldFunction would take care of the statistics computation and the
 WindowFunction would only assemble the result record including extracting
 the start time of the window.

 Then you could do:

 ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
 YourWindowFunction());

 This is more efficient because the FoldFunction is eagerly applied when
 ever a new element is added to a window. Hence, the window does only hold a
 single value (SummaryStatistics) instead of all element added to the 
 window.
 In contrast the WindowFunction is called when the window is finally
 evaluated.

 Hope this helps,
 Fabian

 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck
 :
>
> Hi,
>
> I am writing a program to read timeseries from HBase and do some daily
> aggregations (Flink streaming). For now I am just computing some average 
> so
> not very consuming but my HBase read get slower and slower (I have few
> billions of points to read). The back pressure is almost all the time 
> close
> to 1.
>
> I use custom timestamp:
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> so I implemented a custom extractor based on:
> AscendingTimestampExtractor
>
> At the beginning I have 5M reads/s and after 15 min I have just 1M
> read/s then it get worse and worse. Even when I cancel the job, data are
> still being written in HBase (I did a sink similar to the example - with a
> cache of 100s of HBase Puts to be a bit more efficient).
>
> When I don't put a sink it seems to stay on 1M reads/s.
>
> Do you have an idea why ?
>
> Here is a bit of code if needed:
> final WindowedStream ws = hbaseDS.keyBy(0)
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
> .keyBy(0)
> .timeWindow(Time.days(1));
>
> final SingleOutputStreamOperator puts = ws.apply(new
> WindowFunction() {
>
> @Override
> public void apply(final Tuple key, final TimeWindow window, final
> Iterable input,
> final Collector out) throws Exception {
>
> final SummaryStatistics summaryStatistics = new SummaryStatistics();
> for (final ANA ana : input) {
> summaryStatistics.addValue(ana.getValue());
> }
> final Put put = buildPut((String) key.getField(0), window.getStart(),
> summaryStatistics);
> out.collect(put);
> }
> });
>
> And how I started Flink on YARN :
> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
> -Dtaskmanager.network.numberOfBuffers=4096
>
> Thanks for any feedback!
>
> Christophe


>>>
>>
>


Re: Accessing StateBackend snapshots outside of Flink

2016-06-13 Thread Maximilian Michels
Hi Josh,

I'm not a RocksDB expert but the workaround you described should work.
Just bear in mind that accessing RocksDB concurrently with a Flink job
can result in an inconsistent state. Make sure to perform atomic
updates and clear the RocksDB cache for the item.

Cheers,
Max

On Mon, Jun 13, 2016 at 10:14 AM, Josh  wrote:
> Hello,
> I have a follow-up question to this: since Flink doesn't support state
> expiration at the moment (e.g. expiring state which hasn't been updated for
> a certain amount of time), would it be possible to clear up old UDF states
> by:
> - store a 'last_updated" timestamp in the state value
> - periodically (e.g. monthly) go through all the state values in RocksDB,
> deserialize them using TypeSerializer and read the "last_updated" property
> - delete the key from RocksDB if the state's "last_updated" property is over
> a month ago
>
> Is there any reason this approach wouldn't work, or anything to be careful
> of?
>
> Thanks,
> Josh
>
>
> On Mon, Apr 18, 2016 at 8:23 AM, Aljoscha Krettek 
> wrote:
>>
>> Hi,
>> key refers to the key extracted by your KeySelector. Right now, for every
>> named state (i.e. the name in the StateDescriptor) there is a an isolated
>> RocksDB instance.
>>
>> Cheers,
>> Aljoscha
>>
>> On Sat, 16 Apr 2016 at 15:43 Igor Berman  wrote:
>>>
>>> thanks a lot for the info, seems not too complex
>>> I'll try to write simple tool to read this state.
>>>
>>> Aljoscha, does the key reflects unique id of operator in some way? Or key
>>> is just a "name" that passed to ValueStateDescriptor.
>>>
>>> thanks in advance
>>>
>>>
>>> On 15 April 2016 at 15:10, Stephan Ewen  wrote:

 One thing to add is that you can always trigger a persistent checkpoint
 via the "savepoints" feature:
 https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html



 On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek 
 wrote:
>
> Hi,
> for RocksDB we simply use a TypeSerializer to serialize the key and
> value to a byte[] array and store that in RocksDB. For a ListState, we
> serialize the individual elements using a TypeSerializer and store them 
> in a
> comma-separated list in RocksDB. The snapshots of RocksDB that we write to
> HDFS are regular backups of a RocksDB database, as described here:
> https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F. You
> should be possible to read them from HDFS and restore them to a RocksDB 
> data
> base as described in the linked documentation.
>
> tl;dr As long as you know the type of values stored in the state you
> should be able to read them from RocksDB and deserialize the values using
> TypeSerializer.
>
> One more bit of information: Internally the state is keyed by (key,
> namespace) -> value where namespace can be an arbitrary type that has a
> TypeSerializer. We use this to store window state that is both local to 
> key
> and the current window. For state that you store in a user-defined 
> function
> the namespace will always be null and that will be serialized by a
> VoidSerializer that simply always writes a "0" byte.
>
> Cheers,
> Aljoscha
>
> On Fri, 15 Apr 2016 at 00:18 igor.berman  wrote:
>>
>> Hi,
>> we are evaluating Flink for new solution and several people raised
>> concern
>> of coupling too much to Flink -
>> 1. we understand that if we want to get full fault tolerance and best
>> performance we'll need to use Flink managed state(probably RocksDB
>> backend
>> due to volume of state)
>> 2. but then if we latter find that Flink doesn't answer our needs(for
>> any
>> reason) - we'll need to extract this state in some way(since it's the
>> only
>> source of consistent state)
>> In general I'd like to be able to take snapshot of backend and try to
>> read
>> it...do you think it's will be trivial task?
>> say If I'm holding list state per partitioned key, would it be easy to
>> take
>> RocksDb file and open it?
>>
>> any thoughts regarding how can I convince people in our team?
>>
>> thanks in advance!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.


>>>
>


Re: Application log on Yarn FlinkCluster

2016-06-13 Thread Maximilian Michels
Hi Theofilos,

Flink doesn't send the local client output to the Yarn cluster. I
think this will only change once we move the entire execution of the
Job to the cluster framework. All output of the actual Flink job
should be within the JobManager or TaskManager logs.

There is something wrong with the network communication if the Client
doesn't return from `runBlocking(..)`. Would be interesting to take a
look at the logs to find out why that could be.

Cheers,
Max


On Sat, Jun 11, 2016 at 1:53 PM, Theofilos Kakantousis  wrote:
> Hi Robert,
>
> Thanks for the prompt reply. I'm using the IterateExample from Flink
> examples. In the yarn log I get entries for the YarnJobManager and
> ExecutionGraph, but I was wondering if there is a way to push all the
> logging that the client produces into the yarn log. Including the System.out
> calls. Is there a way to modify the example to use a logging framework to
> achieve it?
>
> Also when I submit the program using the Client runBlocking method, although
> I see in the taskmanager and jobmanager log that the application has
> finished, the runBlocking method does not return. Should I call it in a
> separate thread?
>
> Cheers,
> Theofilos
>
> On 2016-06-10 22:12, Robert Metzger wrote:
>
> Hi Theofilos,
>
> how exactly are you writing the application output?
> Are you using a logging framework?
> Are you writing the log statements from the open(), map(), invoke() methods
> or from some constructors? (I'm asking since different parts are executed on
> the cluster and locally).
>
> On Fri, Jun 10, 2016 at 4:00 PM, Theofilos Kakantousis  wrote:
>>
>> Hi all,
>>
>> Flink 1.0.3
>> Hadoop 2.4.0
>>
>> When running a job on a Flink Cluster on Yarn, the application output is
>> not included in the Yarn log. Instead, it is only printed in the stdout from
>> where I run my program.  For the jobmanager, I'm using the log4j.properties
>> file from the flink/conf directory. Yarn log aggregation is enabled and the
>> YarnJobManager log is printed in the yarn log. The application is submitted
>> by a Flink Client to the FlinkYarnCluster using a PackagedProgram.
>>
>> Is this expected behavior and if so, is there a way to include the
>> application output in the Yarn aggregated log? Thanks!
>>
>> Cheers,
>> Theofilos
>>
>
>


Re: How to maintain the state of a variable in a map transformation.

2016-06-13 Thread Maximilian Michels
Hi Ravikumar,

In short: No, you can't use closures to maintain a global state. If
you want to keep an always global state, you'll have to use
parallelism 1 or an external data store to keep that global state.

Is it possible to break up your global state into a set of local
states which can be combined in the end? That way, you can take
advantage of distributed parallel processing.

Cheers,
Max

On Fri, Jun 10, 2016 at 8:28 AM, Ravikumar Hawaldar
 wrote:
> Hi Fabian,  Thank you for your help.
>
> I want my Flink application to be distributed as well as I want the facility
> to support the update of the variable [Coefficients of LinearRegression].
>
> How you would do in that case?
>
> The problem with iteration is that it expects Dataset with same type to be
> fed back, and my variable is just a double[]. Otherwise I have to map every
> record with a double[] wrapped inside a tuple2 and then try out iterations
> but I am sure this won't work as well.
>
> Can I use closure or lambdas to maintain global state?
>
>
> Regards,
> Ravikumar
>
> On 9 June 2016 at 20:17, Fabian Hueske  wrote:
>>
>> Hi,
>>
>> 1) Yes, that is correct. If you set the parallelism of an operator to 1 it
>> is only executed on a single node. It depends on your application, if you
>> need a global state or whether multiple local states are OK.
>> 2) Flink programs follow the concept a data flow. There is no
>> communication between parallel instances of a task, i.e., all four tasks of
>> a MapOperator with parallelism 4 cannot talk to each other. You might want
>> to take a look at Flink's iteration operators. With these you can feed data
>> back into a previous operator [1].
>> 4) Yes, that should work.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html
>>
>> 2016-06-09 15:01 GMT+02:00 Ravikumar Hawaldar
>> :
>>>
>>> Hi Fabian, Thank you for your answers,
>>>
>>> 1) If there is only single instance of that function, then it will defeat
>>> the purpose of distributed correct me if I am wrong, so If I run parallelism
>>> with 1 on cluster does that mean it will execute on only one node?
>>>
>>> 2) I mean to say, when a map operator returns a variable, is there any
>>> other function which takes that updated variable and returns that to all
>>> instances of map?
>>>
>>> 3) Question Cleared.
>>>
>>> 4) My question was can I use same ExecutionEnvironment for all flink
>>> programs in a module.
>>>
>>> 5) Question Cleared.
>>>
>>>
>>> Regards
>>> Ravikumar
>>>
>>>
>>>
>>> On 9 June 2016 at 17:58, Fabian Hueske  wrote:

 Hi Ravikumar,

 I'll try to answer your questions:
 1) If you set the parallelism of a map function to 1, there will be only
 a single instance of that function regardless whether it is execution
 locally or remotely in a cluster.
 2) Flink does also support aggregations, (reduce, groupReduce, combine,
 ...). However, I do not see how this would help with a stateful map
 function.
 3) In Flink DataSet programs you usually construct the complete program
 and call execute() after you have defined your sinks. There are two
 exceptions: print() and collect() which both add special sinks and
 immediately execute your program. print() prints the result to the stdout 
 of
 the submitting client and collect() fetches a dataset as collection.
 4) I am not sure I understood your question. When you obtain an
 ExecutionEnvironment with ExecutionEnvironment.getExecutionEnvrionment() 
 the
 type of the returned environment depends on the context in which the 
 program
 was executed. It can be a local environment if it is executed from within 
 an
 IDE or a RemodeExecutionEnvironment if the program is executed via the CLI
 client and shipped to a remote cluster.
 5) A map operator processes records one after the other, i.e., as a
 sequence. If you need a certain order, you can call DataSet.sortPartition()
 to locally sort the partition.

 Hope that helps,
 Fabian

 2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar
 :
>
> Hi Till, Thank you for your answer, I have couple of questions
>
> 1) Setting parallelism on a single map function in local is fine but on
> distributed will it work as local execution?
>
> 2) Is there any other way apart from setting parallelism? Like spark
> aggregate function?
>
> 3) Is it necessary that after transformations to call execute function?
> Or Execution starts as soon as it encounters a action (Similar to Spark)?
>
> 4) Can I create a global execution environment (Either local or
> distributed) for different Flink program in a module?
>
> 5) How to make the records come in sequence for a map or any other
> operator?
>
>
> Regards,
> Ravikumar
>
>
> On 8 June 2016 at 21:14, Till Rohrmann  wrote:
>>
>> Hi Ravikumar,
>>
>> Flink

Re: NotSerializableException

2016-06-13 Thread Maximilian Michels
Is there an issue or a fix for proper use of the ClojureCleaner in
CoGroup.where()?

On Fri, Jun 10, 2016 at 8:07 AM, Aljoscha Krettek  wrote:
> Hi,
> yes, I was talking about a Flink bug. I forgot to mention the work-around
> that Stephan mentioned.
>
> On Thu, 9 Jun 2016 at 20:38 Stephan Ewen  wrote:
>>
>> You can also make the KeySelector a static inner class. That should work
>> as well.
>>
>> On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh 
>> wrote:
>>>
>>> Thank you Aljoscha and Fabian for your replies.
>>>
>>> @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm
>>> afraid this is a bug", I am assuming you are referring to Flink engine
>>> itself.
>>>
>>> @Fabian: thanks for the optimization tip.
>>>
>>> This is how I have got it working (with a hack): In my dataset, the join
>>> field/key can be null otherwise .where(fieldName) works and I don't get
>>> not-serializable exception. So I applied a MapFunction to DataSet and put a
>>> dummy value in the join field/key where it was null. Then In the join
>>> function, I change it back to null.
>>>
>>> Best,
>>> Tarandeep
>>>
>>> On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek 
>>> wrote:

 Hi,
 the problem is that the KeySelector is an anonymous inner class and as
 such as a reference to the outer RecordFilterer object. Normally, this 
 would
 be rectified by the closure cleaner but the cleaner is not used in
 CoGroup.where(). I'm afraid this is a bug.

 Best,
 Aljoscha


 On Thu, 9 Jun 2016 at 14:06 Fabian Hueske  wrote:
>
> Hi Tarandeep,
>
> the exception suggests that Flink tries to serialize RecordsFilterer as
> a user function (this happens via Java Serialization).
> I said suggests because the code that uses RecordsFilterer is not
> included.
>
> To me it looks like RecordsFilterer should not be used as a user
> function. It is a helper class to construct a DataSet program, so it 
> should
> not be shipped for execution.
> You would use such a class as follows:
>
> DataSet records = ...
> DataSet filterIDs = ...
>
> RecordsFilterer rf = new RecordsFilterer();
> DataSet> result = rf.addFilterFlag(records,
> filterIDs, "myField");
>
> Regarding the join code, I would suggest an optimization.
> Instead of using CoGroup, I would use distinct and an OuterJoin like
> this:
>
> DataSet distIds = filtereredIds.distinct();
> DataSet result = records
>   .leftOuterJoin(distIds)
>   .where(KEYSELECTOR)
>   .equalTo("*") // use full string as key
>   .with(JOINFUNC) // set Bool to false if right == null, true otherwise
>
> Best, Fabian
>
> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh :
>>
>> Hi,
>>
>> I am getting NoSerializableException in this class-
>>
>> 
>>
>> public class RecordsFilterer {
>>
>> public DataSet> addFilterFlag(DataSet
>> dataset, DataSet filteredIds, String fieldName) {
>> return dataset.coGroup(filteredIds)
>> .where(new KeySelector() {
>> @Override
>> public String getKey(T t) throws Exception {
>> String s = (String) t.get(fieldName);
>> return s != null ? s :
>> UUID.randomUUID().toString();
>> }
>> })
>> .equalTo((KeySelector) s -> s)
>> .with(new CoGroupFunction> Tuple2>() {
>> @Override
>> public void coGroup(Iterable records,
>> Iterable ids,
>> Collector>
>> collector) throws Exception {
>> boolean filterFlag = false;
>> for (String id : ids) {
>> filterFlag = true;
>> }
>>
>> for (T record : records) {
>> collector.collect(new Tuple2<>(filterFlag,
>> record));
>> }
>> }
>> });
>>
>> }
>> }
>>
>>
>> What I am trying to do is write a generic code that will join Avro
>> records (of different types) with String records and there is a match 
>> add a
>> filter flag. This way I can use the same code for different Avro record
>> types. But I am getting this exception-
>>
>> Exception in thread "main"
>> org.apache.flink.optimizer.CompilerException: Error translating node 'Map
>> "Key Extractor" : MAP [[ GlobalProperties 
>> [partitioning=RANDOM_PARTITIONED]
>> ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': 
>> Could
>> not write the user code wrapper class
>> org.apache.flink.api.common.operators.util.UserCod

Re: Arrays values in keyBy

2016-06-13 Thread Aljoscha Krettek
Yes, this is correct. Right now we're basically using .hashCode() for
keying. (Which can be problematic in some cases.)

Beam, for example, clearly specifies that the encoded form of a value
should be used for all comparisons/hashing. This is more well defined but
can lead to slow performance in some cases.

On Sat, 11 Jun 2016 at 00:04 Elias Levy  wrote:

> I would be useful if the documentation warned what type of equality it
> expected of values used as keys in keyBy.  I just got bit in the ass by
> converting a field from a string to a byte array.  All of the sudden the
> windows were no longer aggregating.  So it seems Flink is not doing a deep
> compare of arrays when comparing keys.
>


Re: HBase reads and back pressure

2016-06-13 Thread Christophe Salperwyck
Thanks for the feedback and sorry that I can't try all this straight away.

Is there a way to have a different function than:
WindowFunction()

I would like to return a HBase Put and not a SummaryStatistics. So
something like this:
WindowFunction()

Christophe

2016-06-09 17:47 GMT+02:00 Fabian Hueske :

> OK, this indicates that the operator following the source is a bottleneck.
>
> If that's the WindowOperator, it makes sense to try the refactoring of the
> WindowFunction.
> Alternatively, you can try to run that operator with a higher parallelism.
>
> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <
> christophe.salperw...@gmail.com>:
>
>> Hi Fabian,
>>
>> Thanks for the help, I will try that. The backpressure was on the source
>> (HBase).
>>
>> Christophe
>>
>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
>>
>>> Hi Christophe,
>>>
>>> where does the backpressure appear? In front of the sink operator or
>>> before the window operator?
>>>
>>> In any case, I think you can improve your WindowFunction if you convert
>>> parts of it into a FoldFunction.
>>> The FoldFunction would take care of the statistics computation and the
>>> WindowFunction would only assemble the result record including extracting
>>> the start time of the window.
>>>
>>> Then you could do:
>>>
>>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>>> YourWindowFunction());
>>>
>>> This is more efficient because the FoldFunction is eagerly applied when
>>> ever a new element is added to a window. Hence, the window does only hold a
>>> single value (SummaryStatistics) instead of all element added to the
>>> window. In contrast the WindowFunction is called when the window is finally
>>> evaluated.
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
>>> christophe.salperw...@gmail.com>:
>>>
 Hi,

 I am writing a program to read timeseries from HBase and do some daily
 aggregations (Flink streaming). For now I am just computing some average so
 not very consuming but my HBase read get slower and slower (I have few
 billions of points to read). The back pressure is almost all the time close
 to 1.

 I use custom timestamp:
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

 so I implemented a custom extractor based on:
 AscendingTimestampExtractor

 At the beginning I have 5M reads/s and after 15 min I have just 1M
 read/s then it get worse and worse. Even when I cancel the job, data are
 still being written in HBase (I did a sink similar to the example - with a
 cache of 100s of HBase Puts to be a bit more efficient).

 When I don't put a sink it seems to stay on 1M reads/s.

 Do you have an idea why ?

 Here is a bit of code if needed:
 final WindowedStream ws = hbaseDS.keyBy(0)
 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
 .keyBy(0)
 .timeWindow(Time.days(1));

 final SingleOutputStreamOperator puts = ws.apply(new
 WindowFunction() {

 @Override
 public void apply(final Tuple key, final TimeWindow window, final
 Iterable input,
 final Collector out) throws Exception {

 final SummaryStatistics summaryStatistics = new SummaryStatistics();
 for (final ANA ana : input) {
 summaryStatistics.addValue(ana.getValue());
 }
 final Put put = buildPut((String) key.getField(0), window.getStart(),
 summaryStatistics);
 out.collect(put);
 }
 });

 And how I started Flink on YARN :
 flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
 -Dtaskmanager.network.numberOfBuffers=4096

 Thanks for any feedback!

 Christophe

>>>
>>>
>>
>


Re: Accessing StateBackend snapshots outside of Flink

2016-06-13 Thread Josh
Hello,
I have a follow-up question to this: since Flink doesn't support state
expiration at the moment (e.g. expiring state which hasn't been updated for
a certain amount of time), would it be possible to clear up old UDF states
by:
- store a 'last_updated" timestamp in the state value
- periodically (e.g. monthly) go through all the state values in RocksDB,
deserialize them using TypeSerializer and read the "last_updated" property
- delete the key from RocksDB if the state's "last_updated" property is
over a month ago

Is there any reason this approach wouldn't work, or anything to be careful
of?

Thanks,
Josh


On Mon, Apr 18, 2016 at 8:23 AM, Aljoscha Krettek 
wrote:

> Hi,
> key refers to the key extracted by your KeySelector. Right now, for every
> named state (i.e. the name in the StateDescriptor) there is a an isolated
> RocksDB instance.
>
> Cheers,
> Aljoscha
>
> On Sat, 16 Apr 2016 at 15:43 Igor Berman  wrote:
>
>> thanks a lot for the info, seems not too complex
>> I'll try to write simple tool to read this state.
>>
>> Aljoscha, does the key reflects unique id of operator in some way? Or key
>> is just a "name" that passed to ValueStateDescriptor.
>>
>> thanks in advance
>>
>>
>> On 15 April 2016 at 15:10, Stephan Ewen  wrote:
>>
>>> One thing to add is that you can always trigger a persistent checkpoint
>>> via the "savepoints" feature:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
>>>
>>>
>>>
>>> On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek 
>>> wrote:
>>>
 Hi,
 for RocksDB we simply use a TypeSerializer to serialize the key and
 value to a byte[] array and store that in RocksDB. For a ListState, we
 serialize the individual elements using a TypeSerializer and store them in
 a comma-separated list in RocksDB. The snapshots of RocksDB that we write
 to HDFS are regular backups of a RocksDB database, as described here:
 https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F. You
 should be possible to read them from HDFS and restore them to a RocksDB
 data base as described in the linked documentation.

 tl;dr As long as you know the type of values stored in the state you
 should be able to read them from RocksDB and deserialize the values using
 TypeSerializer.

 One more bit of information: Internally the state is keyed by (key,
 namespace) -> value where namespace can be an arbitrary type that has a
 TypeSerializer. We use this to store window state that is both local to key
 and the current window. For state that you store in a user-defined function
 the namespace will always be null and that will be serialized by a
 VoidSerializer that simply always writes a "0" byte.

 Cheers,
 Aljoscha

 On Fri, 15 Apr 2016 at 00:18 igor.berman  wrote:

> Hi,
> we are evaluating Flink for new solution and several people raised
> concern
> of coupling too much to Flink -
> 1. we understand that if we want to get full fault tolerance and best
> performance we'll need to use Flink managed state(probably RocksDB
> backend
> due to volume of state)
> 2. but then if we latter find that Flink doesn't answer our needs(for
> any
> reason) - we'll need to extract this state in some way(since it's the
> only
> source of consistent state)
> In general I'd like to be able to take snapshot of backend and try to
> read
> it...do you think it's will be trivial task?
> say If I'm holding list state per partitioned key, would it be easy to
> take
> RocksDb file and open it?
>
> any thoughts regarding how can I convince people in our team?
>
> thanks in advance!
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
> Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>

>>>
>>