Capacity Planning For Large State in YARN Cluster

2017-10-26 Thread Ashish Pokharel
Hi Everyone,

We have hit a roadblock moving an app at Production scale and was hoping to get 
some guidance. Application is pretty common use case in stream processing but 
does require maintaining large number of keyed states. We are processing 2 
streams - one of which is a daily burst of stream (normally around 50 mil but 
could go upto 100 mil in one hour burst) and other is constant stream of around 
70-80 mil per hour. We are doing a low level join using CoProcess function 
between the two keyed streams. CoProcess function needs to refresh (upsert) 
state from the daily burst stream and decorate constantly streaming data with 
values from state built using bursty stream. All of the logic is working pretty 
well in a standalone Dev environment. We are throwing about 500k events of 
bursty traffic for state and about 2-3 mil of data stream. We have 1 TM with 
16GB memory, 1 JM with 8 GB memory and 16 slots (1 per core on the server) on 
the server. We have been taking savepoints in case we need to restart app for 
with code changes etc. App does seem to recover from state very well as well. 
Based on the savepoints, total volume of state in production flow should be 
around 25-30GB. 

At this point, however, we are trying deploy the app at production scale. App 
also has a flag that can be set at startup time to ignore data stream so we can 
simply initialize state. So basically we are trying to see if we can initialize 
the state first and take a savepoint as test. At this point we are using 10 TM 
with 4 slots and 8GB memory each (idea was to allocate around 3 times estimated 
state size to start with) but TMs keep getting killed by YARN with a GC 
Overhead Limit Exceeded error. We have gone through quite a few blogs/docs on 
Flink Management Memory, off-heap vs heap memory, Disk Spill over, State 
Backend etc. We did try to tweak managed-memory configs in multiple ways 
(off/on heap, fraction, network buffers etc) but can’t seem to figure out good 
way to fine tune the app to avoid issues. Ideally, we would hold state in 
memory (we do have enough capacity in Production environment for it) for 
performance reasons and spill over to disk (which I believe Flink should 
provide out of the box?). It feels like 3x anticipated state volume in cluster 
memory should have been enough to just initialize state. So instead of just 
continuing to increase memory (which may or may not help as error is regarding 
GC overhead) we wanted to get some input from experts on best practices and 
approach to plan this application better. 

Appreciate your input in advance!

Data sources and slices

2017-10-26 Thread David Dreyfus
Hello,

If I am on a cluster with 2 task managers with 64 CPUs each, I can configure
128 slots in accordance with the documentation. If I set parallelism to 128
and read a 64 MB file (one datasource with a single file), will flink really
create 500K slices? Or, will it check the default blocksize of the host it
is reading from and allocate only as many slices as there are blocks? 

If the file is on S3:
1. Does a single thread copy it to local disk and then have 128 slices
consume it?
2. Does a single thread read read the file from S3 and consume it, treating
it as one slice?
3. Does flink talk to S3 and make a multi-part read to local storage and
then read from local storage in 128 slices?

If a datasource has a large number of files, does each slot read one file at
a time with a single thread, or does each slot read one part of each file
such that 128 slots consume each file one at a time?

More generally, does flink try to allocate files to slots such that each
slot reads the same volume with as long a sequential read as possible? 

How does it distinguish between reading from the local HDFS and S3, given
that they might have vastly different performance characteristics.

Thanks,
David

Thank you,
David



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Not enough free slots to run the job

2017-10-26 Thread David Dreyfus
Hello,

I know this is an older thread, but ...

If some slots are left empty it doesn't necessarily mean that machine
resources are wasted. Some managed memory might be unavailable, but CPU,
heap memory, network, and disk are shared across slots. To the extent there
are multiple operators executing within a slot, multiple threads are
executing consuming those resources. It's not clear what the actual
performance degradation would be, if any. Correct?

David 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Execute multiple jobs in parallel (threading): java.io.OptionalDataException

2017-10-26 Thread David Dreyfus
Hello,

I am trying to submit multiple jobs to flink from within my Java program.
I am running into an exception that may be related:
java.io.OptionalDataException.

Should I be able to create multiple plans/jobs within a single session and
execute them concurrently?
If so, is there a working example you can point me at?

Do I share the same ExecutionEnvironment? 
It looks like calls to getExecutionEnvironment() return the same one.

I have a number of different transformations on my data I'd like to make.
I'd rather not create one very large job and have them processed in
parallel.
My cluster has enough resources that performing each job sequentially would
be very wasteful.

Thank you,
David

Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15)
java.io.OptionalDataException
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1588)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
at java.util.HashMap.readObject(HashMap.java:1407)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2173)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
at
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1283)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Tasks, slots, and partitioned joins

2017-10-26 Thread Fabian Hueske
Hi David,

please find my answers below:

1. For high utilization, all slot should be filled. Each slot will
processes a slice of the program on a slice of the data. In case of
partitioning or changed parallelism, the data is shuffled accordingly .
2. That's a good question. I think the default logic is to go round-robin
on the TMs as you suggested, but I'm not 100% sure. There are a couple of
exceptions and special cases, IIRC.
3. No, I would still use Flink's join operator to do the join. When you
read both files with the same split, you'd have a single source for both
input. You could do something like:

/-- Filter "/source1" --\
Source -<  >-Join-->
\-- Filter "/source2" --/

If all operators have the same parallelism and the source has the right
split properties configured, all data should stay local and the join would
work without partitioning the data.
You could go even further if the data in the files is sorted on the join
key. Then you could read in zig-zag fashion from both files and give sorted
split properties. In theory, the join would happen without a sort (haven't
tried this though).

4.a Yes that is true. FileInputFormat has a flag to prevent files from
being split up into multiple splits.
4.b You might be able to hack this with a custom InputSplitAssigner. The
SplitDataProperties partition methods have a partitioniner ID field. IIRC,
this is used to determine equal partitioning for joins.
However, as I said, you need to make sure that the files with the same keys
are read by the same subtask. You could do that with a custom
InputSplitAssigner.
My proposal to read both files with the same key in the same input split
(with a single source) tried to go around this issue by forcing the data of
both files in the same subtask.
4.c. The concept of a partition is a bit different in Flink and not bound
to InputSplits. All data arriving at a parallel instance of an operator is
considered to be in the same partition.
So both, FlatMap and MapPartition, call open() just once. In MapPartition
the mapPartition() method is also called once, while flatMap() is called
for each record.

Cheers, Fabian




2017-10-26 15:04 GMT+02:00 David Dreyfus :

> Hi Fabian,
>
> Thank you for the great, detailed answers.
> 1. So, each parallel slice of the DAG is placed into one slot. The key to
> high utilization is many slices of the source data (or the various methods
> of repartitioning it). Yes?
> 2. In batch processing, are slots filled round-robin on task managers, or
> do
> I need to tune the number of slots to load the cluster evenly?
> 3. Are you suggesting that I perform the join in my custom data source?
> 4. Looking at this sample from
> org.apache.flink.optimizer.PropertyDataSourceTest
>
>   DataSource> data =
> env.readCsvFile("/some/path").types(Long.class, String.class);
>
>   data.getSplitDataProperties()
> .splitsPartitionedBy(0);
>
> 4.a Does this code assume that one split == one file from /some/path? If
> readCsvFile splits each file, the guarantee that all keys in each part of
> the file share the same partition would be violated, right?
> 4.b Is there a way to mark a partition number so that sources that share
> partition numbers are read in parallel and joined? If I have 10,000 pairs,
> I
> want partition 1 read from the sources at the same time.
> 4.c Does a downstream flatmap function get an open() call for each new
> partition? Or, do I chain MapPartition directly to the datasource?
>
> Thank you,
> David
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Passing Configuration & State

2017-10-26 Thread Fabian Hueske
Hi Navneeth,

the configuring user function using a Configuration object and setting the
parameters in the open() method of a RichFunction is no longer recommended.
In fact, that only works for the DataSet API and has not been added for the
DataStream API. The open() method with the Configuration parameter still
exists because DataSet and DataStream API share these interfaces.

The recommended way to configure user functions is via constructor
parameters while defining the streaming application. The function object
that you create there is serialize and shipped to the workers.
Hence, you can do something like:

DataStream input = ...
DataStream output = input.map(new MyMapFunction(param1, param2));

where param1 and param2 are stored in MyMapFunction, serialized and shipped
with the MyMapFunction instance.

Regarding sharing state objects inside of a user function. You can access
the state object from the RuntimeContext.
However, you must make sure that the state is only accessed within function
calls of the user function, i.e, you should not leak it to other operator
through a singleton that holds state objects.

Hope this helps,
Fabian

2017-10-26 16:54 GMT+02:00 Navneeth Krishnan :

> Hi All,
>
> I have developed a streaming pipeline in java and I need to pass some of
> the configuration parameters that are passed during program startup to user
> functions. I used the below link as reference.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_
> practices.html
>
> I have tried withParameters & setGlobalJobParameters but that doesn't
> seem to work. The parameters are blank inside my user function when
> deployed in a cluster. I have also tried passing the parameters inside the
> constructor of my user function and this seem to work on local but not in
> cluster mode, again the parameters are blank.
>
> Is there a recommended way to pass the program parameters to user function
> classes?
>
> Also, I have scenario where the state created inside a user function has
> to passed around to multiple classes. Is there a state registry or
> something from which I can retrieve a registered state and use or should I
> implement my own?
>
> Thanks in advance.
>
> Regards,
> Navneeth
>


Re: PrometheusReporter error

2017-10-26 Thread cslotterback
Hello 김동원,

We are experiencing the same issue you were when trying to use the 1.4
prometheus reporter with 1.3:


[...]
Error while registering metric.
java.lang.IllegalArgumentException: Collector already registered that
provides name: flink_taskmanager_Status_JVM_CPU_Load
[...]
-

The jira bug associated with this remain open, how were you able to stop
flink from loading
the taskmanager metrics multiple times?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Passing Configuration & State

2017-10-26 Thread Navneeth Krishnan
Hi All,

I have developed a streaming pipeline in java and I need to pass some of
the configuration parameters that are passed during program startup to user
functions. I used the below link as reference.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html

I have tried withParameters & setGlobalJobParameters but that doesn't seem
to work. The parameters are blank inside my user function when deployed in
a cluster. I have also tried passing the parameters inside the constructor
of my user function and this seem to work on local but not in cluster mode,
again the parameters are blank.

Is there a recommended way to pass the program parameters to user function
classes?

Also, I have scenario where the state created inside a user function has to
passed around to multiple classes. Is there a state registry or something
from which I can retrieve a registered state and use or should I implement
my own?

Thanks in advance.

Regards,
Navneeth


Re: Local combiner on each mapper in Flink

2017-10-26 Thread Le Xu
Thanks for the help!  I’ll try out the ProcessFunction then.

Le

> On Oct 26, 2017, at 8:03 AM, Kien Truong  wrote:
> 
> Hi,
> For Streaming API, use a ProcessFunction as Fabian's suggestion. 
> You can pretty much do anything with a ProcessFunction :)
> 
> Best regards,
> 
> Kien
> 
> 
> On 10/26/2017 8:01 PM, Le Xu wrote:
>> Hi Kien:
>> 
>> Is there a similar API for DataStream as well?
>> 
>> Thanks!
>> 
>> Le
>> 
>> 
>>> On Oct 26, 2017, at 7:58 AM, Kien Truong >> > wrote:
>>> 
>>> Hi,
>>> 
>>> For batch API, you can use GroupReduceFunction, which give you the same 
>>> benefit as a MapReduce combiner.
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions
>>>  
>>> Regards,
>>> Kien
>>> 
>>> 
>>> On 10/26/2017 7:37 PM, Le Xu wrote:
 Thanks guys! That makes more sense now. 
 
 So does it mean once I start use a window operator, all operations on my 
 WindowedStream would be global (across all 
 partitions)? In that case, WindowedStream.aggregate (or sum) would apply 
 to all data after shuffling instead of each partition. 
 
 If I understand this correctly, once I want to perform some sort of 
 counting within each partition for different words (such as word count), I 
 should really avoid using keyBy but keep some sort of counting map for 
 each word while also keep track of the current time stamp, inside each 
 mapper.
 
 Le
 
 
 
 
> On Oct 26, 2017, at 3:17 AM, Fabian Hueske  > wrote:
> 
> Hi,
> 
> in a MapReduce context, combiners are used to reduce the amount of data 
> 1) to shuffle and fully sort (to group the data by key) and 2) to reduce 
> the impact of skewed data.
> 
> The question is, why do you need a combiner in your use case.
> - To reduce the data to shuffle: You should not use a window operator to 
> preaggregate because keyBy implies a shuffle. Instead you could implement 
> a ProcessFunction with operator state. In this solution you need to 
> implement the windowing logic yourself, i.e., group data in window based 
> on their timestamp. Ensure you don't run out of memory (operator state is 
> kept on the heap), etc. So this solution needs quite a bit of manual 
> tuning.
> - To reduce the impact of skewed data: You can use a window aggregation 
> if you don't mind the shuffle. However, you should add an additional 
> artificial key attribute to spread out the computation of the same 
> original key to more grouping key. Note that Flink assigns grouping keys 
> by hash partitioning to workers. This works well for many distinct keys, 
> but might cause issues in case of low key cardinality. Also note that the 
> state size grows and effectiveness reduces with an increasing cardinality 
> of the artificial key.
> 
> Hope this helps,
> Fabian
> 
> 2017-10-26 3:32 GMT+02:00 Kurt Young  >:
> Do you mean you want to keep the origin window as well as doing some 
> combine operations inside window in the same time?
> What kind of data do you expect the following operator will receive?
> 
> Best,
> Kurt
> 
> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu  > wrote:
> Thank Kurt I'm trying out WindowedStream aggregate right now. Just 
> wondering, is there any way for me to preserve the window after 
> aggregation. More specifically, originally i have something like:
> 
> WindowedStream, Tuple, TimeWindow> windowStream = 
> dataStream
> .keyBy(0) //id 
> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
> 
> and then for the reducer I can do:
>  
> windowStream.apply(...) 
> 
> and expect the window information is preserved.
> 
> If I were to do use aggregate on window stream, I would end up with 
> something like:
> 
> DataStream> windowStream = dataStream
> .keyBy(0) //id 
> .timeWindow(Time.of(windowSize, 
> TimeUnit.MILLISECONDS)).aggregate
>   (new AggregateFunction, 
> Accumulator, Tuple2>() {
> @Override
> public Accumulator createAccumulator() {
> return null;
> }
> 
> @Override
> public void add(Tuple2 stringLong, 
> Accumulator o)   {
> 
> }
> 
> @Override
> public Tuple2

Re: Tasks, slots, and partitioned joins

2017-10-26 Thread David Dreyfus
Hi Fabian,

Thank you for the great, detailed answers. 
1. So, each parallel slice of the DAG is placed into one slot. The key to
high utilization is many slices of the source data (or the various methods
of repartitioning it). Yes?
2. In batch processing, are slots filled round-robin on task managers, or do
I need to tune the number of slots to load the cluster evenly?
3. Are you suggesting that I perform the join in my custom data source?
4. Looking at this sample from
org.apache.flink.optimizer.PropertyDataSourceTest

  DataSource> data = 
env.readCsvFile("/some/path").types(Long.class, String.class); 
 
  data.getSplitDataProperties() 
.splitsPartitionedBy(0); 

4.a Does this code assume that one split == one file from /some/path? If
readCsvFile splits each file, the guarantee that all keys in each part of
the file share the same partition would be violated, right?
4.b Is there a way to mark a partition number so that sources that share
partition numbers are read in parallel and joined? If I have 10,000 pairs, I
want partition 1 read from the sources at the same time.
4.c Does a downstream flatmap function get an open() call for each new
partition? Or, do I chain MapPartition directly to the datasource?

Thank you,
David



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Local combiner on each mapper in Flink

2017-10-26 Thread Kien Truong

Hi,

For Streaming API, use a ProcessFunction as Fabian's suggestion.

You can pretty much do anything with a ProcessFunction :)


Best regards,

Kien


On 10/26/2017 8:01 PM, Le Xu wrote:

Hi Kien:

Is there a similar API for DataStream as well?

Thanks!

Le


On Oct 26, 2017, at 7:58 AM, Kien Truong > wrote:


Hi,

For batch API, you can use GroupReduceFunction, which give you the 
same benefit as a MapReduce combiner.


https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions

Regards,

Kien


On 10/26/2017 7:37 PM, Le Xu wrote:

Thanks guys! That makes more sense now.

So does it mean once I start use a window operator, all operations 
on my WindowedStream would be global (across all partitions)? In 
that case, WindowedStream.aggregate (or sum) would apply to all data 
after shuffling instead of each partition.


If I understand this correctly, once I want to perform some sort of 
counting within each partition for different words (such as word 
count), I should really avoid using keyBy but keep some sort of 
counting map for each word while also keep track of the current time 
stamp, inside each mapper.


Le




On Oct 26, 2017, at 3:17 AM, Fabian Hueske > wrote:


Hi,

in a MapReduce context, combiners are used to reduce the amount of 
data 1) to shuffle and fully sort (to group the data by key) and 2) 
to reduce the impact of skewed data.


The question is, why do you need a combiner in your use case.
- To reduce the data to shuffle: You should not use a window 
operator to preaggregate because keyBy implies a shuffle. Instead 
you could implement a ProcessFunction with operator state. In this 
solution you need to implement the windowing logic yourself, i.e., 
group data in window based on their timestamp. Ensure you don't run 
out of memory (operator state is kept on the heap), etc. So this 
solution needs quite a bit of manual tuning.
- To reduce the impact of skewed data: You can use a window 
aggregation if you don't mind the shuffle. However, you should add 
an additional artificial key attribute to spread out the 
computation of the same original key to more grouping key. Note 
that Flink assigns grouping keys by hash partitioning to workers. 
This works well for many distinct keys, but might cause issues in 
case of low key cardinality. Also note that the state size grows 
and effectiveness reduces with an increasing cardinality of the 
artificial key.


Hope this helps,
Fabian

2017-10-26 3:32 GMT+02:00 Kurt Young >:


Do you mean you want to keep the origin window as well as doing
some combine operations inside window in the same time?
What kind of data do you expect the following operator will
receive?

Best,
Kurt

On Thu, Oct 26, 2017 at 5:29 AM, Le Xu mailto:sharonx...@gmail.com>> wrote:

Thank Kurt I'm trying out WindowedStream aggregate right
now. Just wondering, is there any way for me to preserve
the window after aggregation. More specifically, originally
i have something like:

WindowedStream, Tuple, TimeWindow>
windowStream = dataStream
    .keyBy(0) //id
.timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))

and then for the reducer I can do:
windowStream.apply(...)

and expect the window information is preserved.

If I were to do use aggregate on window stream, I would end
up with something like:

DataStream> windowStream = dataStream
      .keyBy(0) //id
.timeWindow(Time.of(windowSize,
TimeUnit.MILLISECONDS)).aggregate
(new AggregateFunction, Accumulator,
Tuple2>() {
          @Override
          public Accumulator createAccumulator() {
              return null;
          }

          @Override
          public void add(Tuple2 stringLong,
Accumulator o) {

          }

          @Override
          public Tuple2 getResult(Accumulator
o) {
              return null;
          }

          @Override
          public Accumulator merge(Accumulator o,
Accumulator acc1) {
              return null;
          }
      });

Because it looks like aggregate would only transfer
WindowedStream to a DataStream. But for a global
aggregation phase (a reducer), should I extract the window
again?


Thanks! I apologize if that sounds like a very intuitive
questions.


Le






On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young
mailto:ykt...@gmail.com>> wrote:

I think you can use WindowedStream.aggreate

Best,
Kurt

On Tue, Oct 24, 2017 at 1:45 PM, Le Xu
mailto:sharonx...@gmail.com>> wrote:

Re: Local combiner on each mapper in Flink

2017-10-26 Thread Le Xu
Hi Kien:

Is there a similar API for DataStream as well?

Thanks!

Le


> On Oct 26, 2017, at 7:58 AM, Kien Truong  wrote:
> 
> Hi,
> 
> For batch API, you can use GroupReduceFunction, which give you the same 
> benefit as a MapReduce combiner.
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions
>  
> Regards,
> Kien
> 
> 
> On 10/26/2017 7:37 PM, Le Xu wrote:
>> Thanks guys! That makes more sense now. 
>> 
>> So does it mean once I start use a window operator, all operations on my 
>> WindowedStream would be global (across all partitions)? In that case, 
>> WindowedStream.aggregate (or sum) would apply to all data after shuffling 
>> instead of each partition. 
>> 
>> If I understand this correctly, once I want to perform some sort of counting 
>> within each partition for different words (such as word count), I should 
>> really avoid using keyBy but keep some sort of counting map for each word 
>> while also keep track of the current time stamp, inside each mapper.
>> 
>> Le
>> 
>> 
>> 
>> 
>>> On Oct 26, 2017, at 3:17 AM, Fabian Hueske >> > wrote:
>>> 
>>> Hi,
>>> 
>>> in a MapReduce context, combiners are used to reduce the amount of data 1) 
>>> to shuffle and fully sort (to group the data by key) and 2) to reduce the 
>>> impact of skewed data.
>>> 
>>> The question is, why do you need a combiner in your use case.
>>> - To reduce the data to shuffle: You should not use a window operator to 
>>> preaggregate because keyBy implies a shuffle. Instead you could implement a 
>>> ProcessFunction with operator state. In this solution you need to implement 
>>> the windowing logic yourself, i.e., group data in window based on their 
>>> timestamp. Ensure you don't run out of memory (operator state is kept on 
>>> the heap), etc. So this solution needs quite a bit of manual tuning.
>>> - To reduce the impact of skewed data: You can use a window aggregation if 
>>> you don't mind the shuffle. However, you should add an additional   
>>> artificial key attribute to spread out the computation of the same 
>>> original key to more grouping key. Note that Flink assigns grouping keys by 
>>> hash partitioning to workers. This works well for many distinct keys, but 
>>> might cause issues in case of low key cardinality. Also note that the state 
>>> size grows and effectiveness reduces with an increasing cardinality of the 
>>> artificial key.
>>> 
>>> Hope this helps,
>>> Fabian
>>> 
>>> 2017-10-26 3:32 GMT+02:00 Kurt Young >> >:
>>> Do you mean you want to keep the origin window as well as doing some 
>>> combine operations inside window in the same time?
>>> What kind of data do you expect the following operator will receive?
>>> 
>>> Best,
>>> Kurt
>>> 
>>> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu >> > wrote:
>>> Thank Kurt I'm trying out WindowedStream aggregate right now. Just 
>>> wondering, is there any way for me to preserve the window after 
>>> aggregation. More specifically, originally i have something like:
>>> 
>>> WindowedStream, Tuple, TimeWindow> windowStream = 
>>> dataStream
>>> .keyBy(0) //id 
>>> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>>> 
>>> and then for the reducer I can do:
>>>  
>>> windowStream.apply(...) 
>>> 
>>> and expect the window information is preserved.
>>> 
>>> If I were to do use aggregate on window stream, I would end up with 
>>> something like:
>>> 
>>> DataStream> windowStream = dataStream
>>> .keyBy(0) //id 
>>> .timeWindow(Time.of(windowSize, 
>>> TimeUnit.MILLISECONDS)).aggregate
>>> (new AggregateFunction, 
>>> Accumulator, Tuple2>() {
>>> @Override
>>> public Accumulator createAccumulator() {
>>> return null;
>>> }
>>> 
>>> @Override
>>> public void add(Tuple2 stringLong, 
>>> Accumulator o) {
>>> 
>>> }
>>> 
>>> @Override
>>> public Tuple2 getResult(Accumulator o) {
>>> return null;
>>> }
>>> 
>>> @Override
>>> public Accumulator merge(Accumulator o, Accumulator 
>>> acc1) {
>>> return null;
>>> }
>>> });
>>> 
>>> Because it looks like aggregate would only transfer WindowedStream to a 
>>> DataStream. But for a global aggregation phase (a reducer), should I 
>>> extract the window again?
>>> 
>>> 
>>> Thanks! I apologize if that sounds like a very intuitive questions.
>>> 
>>> 
>>> Le
>>> 
>>> 
>>> 

Re: Local combiner on each mapper in Flink

2017-10-26 Thread Kien Truong

Hi,

For batch API, you can use GroupReduceFunction, which give you the same 
benefit as a MapReduce combiner.


https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions

Regards,

Kien


On 10/26/2017 7:37 PM, Le Xu wrote:

Thanks guys! That makes more sense now.

So does it mean once I start use a window operator, all operations on 
my WindowedStream would be global (across all partitions)? In that 
case, WindowedStream.aggregate (or sum) would apply to all data after 
shuffling instead of each partition.


If I understand this correctly, once I want to perform some sort of 
counting within each partition for different words (such as word 
count), I should really avoid using keyBy but keep some sort of 
counting map for each word while also keep track of the current time 
stamp, inside each mapper.


Le




On Oct 26, 2017, at 3:17 AM, Fabian Hueske > wrote:


Hi,

in a MapReduce context, combiners are used to reduce the amount of 
data 1) to shuffle and fully sort (to group the data by key) and 2) 
to reduce the impact of skewed data.


The question is, why do you need a combiner in your use case.
- To reduce the data to shuffle: You should not use a window operator 
to preaggregate because keyBy implies a shuffle. Instead you could 
implement a ProcessFunction with operator state. In this solution you 
need to implement the windowing logic yourself, i.e., group data in 
window based on their timestamp. Ensure you don't run out of memory 
(operator state is kept on the heap), etc. So this solution needs 
quite a bit of manual tuning.
- To reduce the impact of skewed data: You can use a window 
aggregation if you don't mind the shuffle. However, you should add an 
additional artificial key attribute to spread out the computation of 
the same original key to more grouping key. Note that Flink assigns 
grouping keys by hash partitioning to workers. This works well for 
many distinct keys, but might cause issues in case of low key 
cardinality. Also note that the state size grows and effectiveness 
reduces with an increasing cardinality of the artificial key.


Hope this helps,
Fabian

2017-10-26 3:32 GMT+02:00 Kurt Young >:


Do you mean you want to keep the origin window as well as doing
some combine operations inside window in the same time?
What kind of data do you expect the following operator will receive?

Best,
Kurt

On Thu, Oct 26, 2017 at 5:29 AM, Le Xu mailto:sharonx...@gmail.com>> wrote:

Thank Kurt I'm trying out WindowedStream aggregate right now.
Just wondering, is there any way for me to preserve the
window after aggregation. More specifically, originally i
have something like:

WindowedStream, Tuple, TimeWindow>
windowStream = dataStream
.keyBy(0) //id
.timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))

and then for the reducer I can do:
windowStream.apply(...)

and expect the window information is preserved.

If I were to do use aggregate on window stream, I would end
up with something like:

DataStream> windowStream = dataStream
.keyBy(0) //id
.timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
(new AggregateFunction, Accumulator,
Tuple2>() {
@Override
public Accumulator createAccumulator() {
  return null;
                    }

@Override
public void add(Tuple2 stringLong, Accumulator o) {

                    }

@Override
public Tuple2 getResult(Accumulator o) {
  return null;
                    }

@Override
public Accumulator merge(Accumulator o, Accumulator acc1) {
  return null;
                    }
                });

Because it looks like aggregate would only transfer
WindowedStream to a DataStream. But for a global aggregation
phase (a reducer), should I extract the window again?


Thanks! I apologize if that sounds like a very intuitive
questions.


Le






On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young mailto:ykt...@gmail.com>> wrote:

I think you can use WindowedStream.aggreate

Best,
Kurt

On Tue, Oct 24, 2017 at 1:45 PM, Le Xu
mailto:sharonx...@gmail.com>> wrote:

Thanks Kurt. Maybe I wasn't clear before, I was
wondering if Flink has implementation of combiner in
DataStream (to use after keyBy and windowing).

Thanks again!

Le

On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young
mailto:ykt...@gmail.com>> wrote:

Hi,

The document you are looking at is pretty old,
you

Re: Local combiner on each mapper in Flink

2017-10-26 Thread Le Xu
Thanks guys! That makes more sense now. 

So does it mean once I start use a window operator, all operations on my 
WindowedStream would be global (across all partitions)? In that case, 
WindowedStream.aggregate (or sum) would apply to all data after shuffling 
instead of each partition. 

If I understand this correctly, once I want to perform some sort of counting 
within each partition for different words (such as word count), I should really 
avoid using keyBy but keep some sort of counting map for each word while also 
keep track of the current time stamp, inside each mapper.

Le




> On Oct 26, 2017, at 3:17 AM, Fabian Hueske  wrote:
> 
> Hi,
> 
> in a MapReduce context, combiners are used to reduce the amount of data 1) to 
> shuffle and fully sort (to group the data by key) and 2) to reduce the impact 
> of skewed data.
> 
> The question is, why do you need a combiner in your use case.
> - To reduce the data to shuffle: You should not use a window operator to 
> preaggregate because keyBy implies a shuffle. Instead you could implement a 
> ProcessFunction with operator state. In this solution you need to implement 
> the windowing logic yourself, i.e., group data in window based on their 
> timestamp. Ensure you don't run out of memory (operator state is kept on the 
> heap), etc. So this solution needs quite a bit of manual tuning.
> - To reduce the impact of skewed data: You can use a window aggregation if 
> you don't mind the shuffle. However, you should add an additional artificial 
> key attribute to spread out the computation of the same original key to more 
> grouping key. Note that Flink assigns grouping keys by hash partitioning to 
> workers. This works well for many distinct keys, but might cause issues in 
> case of low key cardinality. Also note that the state size grows and 
> effectiveness reduces with an increasing cardinality of the artificial key.
> 
> Hope this helps,
> Fabian
> 
> 2017-10-26 3:32 GMT+02:00 Kurt Young  >:
> Do you mean you want to keep the origin window as well as doing some combine 
> operations inside window in the same time?
> What kind of data do you expect the following operator will receive?
> 
> Best,
> Kurt
> 
> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu  > wrote:
> Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering, 
> is there any way for me to preserve the window after aggregation. More 
> specifically, originally i have something like:
> 
> WindowedStream, Tuple, TimeWindow> windowStream = 
> dataStream
> .keyBy(0) //id 
> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
> 
> and then for the reducer I can do:
>  
> windowStream.apply(...) 
> 
> and expect the window information is preserved.
> 
> If I were to do use aggregate on window stream, I would end up with something 
> like:
> 
> DataStream> windowStream = dataStream
> .keyBy(0) //id 
> .timeWindow(Time.of(windowSize, 
> TimeUnit.MILLISECONDS)).aggregate
>   (new AggregateFunction, 
> Accumulator, Tuple2>() {
> @Override
> public Accumulator createAccumulator() {
> return null;
> }
> 
> @Override
> public void add(Tuple2 stringLong, 
> Accumulator o)   {
> 
> }
> 
> @Override
> public Tuple2 getResult(Accumulator o) {
> return null;
> }
> 
> @Override
> public Accumulator merge(Accumulator o, Accumulator acc1) 
> {
> return null;
> }
> });
> 
> Because it looks like aggregate would only transfer WindowedStream to a 
> DataStream. But for a global aggregation phase (a reducer), should I extract 
> the window again?
> 
> 
> Thanks! I apologize if that sounds like a very intuitive questions.
> 
> 
> Le
> 
> 
> 
> 
> 
> 
> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young  > wrote:
> I think you can use WindowedStream.aggreate
> 
> Best,
> Kurt
> 
> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu  > wrote:
> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has 
> implementation of combiner in DataStream (to use after keyBy and windowing).
> 
> Thanks again!
> 
> Le
> 
> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young  > wrote:
> Hi,
> 
> The document you are looking at is pretty old, you can check the newest 
> version here: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html
>  
> 
> 
> Regarding to your question, you can use combineGroup 
> 
> B

Re: Checkpoint was declined (tasks not ready)

2017-10-26 Thread bartektartanus
I think we could try with option number one, as it seems to be easier to
implement. Currently I'm cloning Flink repo to fix this and test that
solution with our currently not working code. Unfortunately, it takes
forever to download all the dependencies. Anyway, I hope that eventually
will manage to create pull request (today). To which branch? Is master ok?
 
Bartek



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: StreamTransformation object

2017-10-26 Thread Tony Wei
Hi Andrea,

The `learn` operator is defined in this method [1]. If you need to set its
slotSharing group, you should add `slotSharingGroup(...)` behind line 97
[2] or a new API to get the result from `inferenceStreamBuilder.build()`.

Best Regards,
Tony Wei

[1]
https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L148
[2]
https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L97

2017-10-26 17:36 GMT+08:00 AndreaKinn :

> Can you be clearer about this part?
>
> I'm really appreciating your help
>
>
> Tony Wei wrote
> > you need to refactor `HTMStream` to expose
> > `InferenceStreamBuilder.build()`.
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: StreamTransformation object

2017-10-26 Thread AndreaKinn
Can you be clearer about this part?

I'm really appreciating your help


Tony Wei wrote
> you need to refactor `HTMStream` to expose
> `InferenceStreamBuilder.build()`.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: State snapshotting when source is finite

2017-10-26 Thread Flavio Pompermaier
Done: https://issues.apache.org/jira/browse/FLINK-7930

Best,
Flavio

On Thu, Oct 26, 2017 at 10:52 AM, Till Rohrmann 
wrote:

> Hi Flavio,
>
> this kind of feature is indeed useful and currently not supported by
> Flink. I think, however, that this feature is a bit trickier to implement,
> because Tasks cannot currently initiate checkpoints/savepoints on their
> own. This would entail some changes to the lifecycle of a Task and an extra
> communication step with the JobManager. However, nothing impossible to do.
>
> Please open a JIRA issue with the description of the problem where we can
> continue the discussion.
>
> Cheers,
> Till
>
> On Thu, Oct 26, 2017 at 9:58 AM, Fabian Hueske  wrote:
>
>> Hi Flavio,
>>
>> Thanks for bringing up this topic.
>> I think running periodic jobs with state that gets restored and persisted
>> in a savepoint is a very valid use case and would fit the stream is a
>> superset of batch story quite well.
>> I'm not sure if this behavior is already supported, but think this would
>> be a desirable feature.
>>
>> I'm looping in Till and Aljoscha who might have some thoughts on this as
>> well.
>> Depending on the discussion we should open a JIRA for this feature.
>>
>> Cheers, Fabian
>>
>> 2017-10-25 10:31 GMT+02:00 Flavio Pompermaier :
>>
>>> Hi to all,
>>> in my current use case I'd like to improve one step of our batch
>>> pipeline.
>>> There's one specific job that ingest a tabular dataset (of Rows) and
>>> explode it into a set of RDF statements (as Tuples).  The objects we output
>>> are a containers of those Tuples (grouped by a field).
>>> Flink stateful streaming could be a perfect fit here because we
>>> incrementally increase the state of those containers but we don't have to
>>> spend a lot of time performing some GET operation to an external Key-value
>>> store.
>>> The big problem here is that the sources are finite and the state of the
>>> job gets lost once the job ends, while I was expecting that Flink was
>>> snapshotting the state of its operators before exiting.
>>>
>>> This idea was inspired by https://data-artisans.com/b
>>> log/queryable-state-use-case-demo#no-external-store, whit the
>>> difference that one can resume the state of the stateful application only
>>> when required.
>>> Do you think that it could be possible to support such a use case (that
>>> we can summarize as "periodic batch jobs that pick up where they left")?
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>


Re: StreamTransformation object

2017-10-26 Thread Tony Wei
Hi Andrea,

In this way, you will only set a slotSharing group on select operator and
learn operator will remain in the default group.
If you want to set lean operator as well, I am afraid that you need to
refactor `HTMStream` to expose `InferenceStreamBuilder.build()`.

Best Regards,
Tony Wei

2017-10-26 17:01 GMT+08:00 AndreaKinn :

> Mmm looks good. This solution would be great.
> In this way am I setting a slotSharing group for both learn and select
> method and not only on select?
> I believed I need to call slotSharingGroup exactly on the return type of
> learn.
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: StreamTransformation object

2017-10-26 Thread AndreaKinn
Mmm looks good. This solution would be great.
In this way am I setting a slotSharing group for both learn and select
method and not only on select? 
I believed I need to call slotSharingGroup exactly on the return type of
learn.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Tasks, slots, and partitioned joins

2017-10-26 Thread Fabian Hueske
Hi David,

Flink's DataSet API schedules one slice of a program to a task slot. A
program slice is one parallel instance of each operator of a program.
When all operator of your program run with a parallelism of 1, you end up
with only 1 slice that runs on a single slot.
Flink's DataSet API leverages data parallelism (running parallel instance
of the same operator on different workers working on different data
partitions) instead of task parallelism (running different operators on
different workers).

Regarding your task, I would implement a custom InputFormat which extends
the FileInputFormat. The FileInputFormat.open() [1] method is called with a
FileInputSplit [2] which contains the file path. You can put the path aside
and add as an additional field when emitting records in the nextRecord()
method.
This way, you only need two sources (one for /source1 and one for /source2)
and can join the records on a composite key of filename and join key. This
should balance the load evenly over a larger number of keys.
However, you would lose the advantage of pre-partitioned files because all
data of source1 would be joined with all data of source2.

There is a low-level interface to leverage pre-partitioned files. With
SplitDataProperties [3] you can specify that the data produced by a
DataSource [4] is partitioned by InputSplit.
If you implement the source in a way that a single split contains the
information to read both files, you can avoid an additional shuffle and
join locally. This is manual low-level optimization where you need to know
what you are doing. I'm not sure if this is documented except for the Java
Docs.

Hope this helps,
Fabian

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L684
[2]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java#L34
[3]
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java#L101
[4]
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java#L117






2017-10-26 4:13 GMT+02:00 David Dreyfus :

> Hello -
>
> I have a large number of pairs of files. For purpose of discussion:
> /source1/{1..1} and /source2/{1..1}.
>
> I want to join the files pair-wise: /source1/1 joined to /source2/1,
> /source1/2 joined to /source2/2, and so on.
> I then want to union the results of the pair-wise joins and perform an
> aggregate.
>
> I create a simple flink job that has four sources, two joins, and two sinks
> to produce intermediate results. This represents two unrelated chains.
>
> I notice that when running this job with parallelism = 1 on a standalone
> machine with one task manager and 3 slots, only one slot gets used.
>
> My concern is that when I scale up to a YARN cluster, flink will continue
> to
> use one slot on one machine instead of using all slots on all machines.
>
> Prior reading suggests all the data source subtasks are added to a default
> resource group. Downstream tasks (joins and sinks) want to be colocated
> with
> the data sources. The result is all of my tasks are executed in one slot.
>
> Flink Stream (DataStream) offers the slotSharingGroup() function. This
> doesn't seem available to the DataSet user.
>
> *Q1:* How do I force Flink to distribute work evenly across task managers
> and the slots allocated to them? If this shouldn't be a concern, please
> elaborate.
>
> When I scale up the number of unrelated chains I notice that flink seems to
> start all of them at the same time, which results in thrashing and errors -
> lots of IO and errors regarding hash buffers.
>
> *Q2:* Is there any method for controlling the scheduling of tasks so that
> some finish before others start? My work around is to execute multiple,
> sequential batches with results going into an intermediate directory, and
> then a final job that aggregates the results. I would certainly prefer one
> job that might avoid the intermediate write.
>
> If I treat /source1 as one data source and /source2 as the second, and then
> join the two, flink will shuffle and partition the files on the join key.
> The /source1 and /source2 files represent this partitioning. They are
> reused
> multiple times; thus, I shuffle and save the results creating /source1 and
> /source2.
>
> *Q3:* Does flink have a method by which I can mark individual files (or
> directories) as belonging to a particular partition so that when I try to
> join them, the unnecessary shuffle and repartition is avoided?
>
> Thank you,
> David
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: State snapshotting when source is finite

2017-10-26 Thread Till Rohrmann
Hi Flavio,

this kind of feature is indeed useful and currently not supported by Flink.
I think, however, that this feature is a bit trickier to implement, because
Tasks cannot currently initiate checkpoints/savepoints on their own. This
would entail some changes to the lifecycle of a Task and an extra
communication step with the JobManager. However, nothing impossible to do.

Please open a JIRA issue with the description of the problem where we can
continue the discussion.

Cheers,
Till

On Thu, Oct 26, 2017 at 9:58 AM, Fabian Hueske  wrote:

> Hi Flavio,
>
> Thanks for bringing up this topic.
> I think running periodic jobs with state that gets restored and persisted
> in a savepoint is a very valid use case and would fit the stream is a
> superset of batch story quite well.
> I'm not sure if this behavior is already supported, but think this would
> be a desirable feature.
>
> I'm looping in Till and Aljoscha who might have some thoughts on this as
> well.
> Depending on the discussion we should open a JIRA for this feature.
>
> Cheers, Fabian
>
> 2017-10-25 10:31 GMT+02:00 Flavio Pompermaier :
>
>> Hi to all,
>> in my current use case I'd like to improve one step of our batch pipeline.
>> There's one specific job that ingest a tabular dataset (of Rows) and
>> explode it into a set of RDF statements (as Tuples).  The objects we output
>> are a containers of those Tuples (grouped by a field).
>> Flink stateful streaming could be a perfect fit here because we
>> incrementally increase the state of those containers but we don't have to
>> spend a lot of time performing some GET operation to an external Key-value
>> store.
>> The big problem here is that the sources are finite and the state of the
>> job gets lost once the job ends, while I was expecting that Flink was
>> snapshotting the state of its operators before exiting.
>>
>> This idea was inspired by https://data-artisans.com/b
>> log/queryable-state-use-case-demo#no-external-store, whit the difference
>> that one can resume the state of the stateful application only when
>> required.
>> Do you think that it could be possible to support such a use case (that
>> we can summarize as "periodic batch jobs that pick up where they left")?
>>
>> Best,
>> Flavio
>>
>
>


Re: StreamTransformation object

2017-10-26 Thread Tony Wei
Hi Andrea,

I roughly read that external library[1], and I think the return object of
"select" function could be casted as `SingleOutputStreamOperator` type [2].
How about trying the following code?

DataStream>
LCxAccResult = HTM.learn(LCxAccStream, new Harness.AnomalyNetwork())
  .select(new InferenceSelectFunction>() {...};
((SingleOutputStreamOperator) LCxAccResult).slotSharingGroup("...");

Best Regards,
Tony Wei

[1]
https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L99
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#returns-org.apache.flink.api.common.typeinfo.TypeInformation-

2017-10-26 16:31 GMT+08:00 AndreaKinn :

> Sorry Tony it is my fault, I was wrong the first post. Actually now my
> situation is the following:
>
>
> DataStream>
> LCxAccResult = HTM.learn(LCxAccStream, new Harness.AnomalyNetwork())
> .select(new InferenceSelectFunction<
> Harness.KafkaRecord,
> Tuple7>() {...}
>
>
> so actually the return value of "Learn" is a HTMStream object and the
> return
> value of "Select" is a DataStream where I need to implement
> slotSharingGroup
> on Learn. So I think I can't set SingleOutputStreamOperator as return value
> of learn, I believe (I hope not since I have not a clue how to do it :D) I
> need to define slotSharingGroup directly in HTMStream class, as in the
> first
> post.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: HBase config settings go missing within Yarn.

2017-10-26 Thread Niels Basjes
I have an idea how we can reduce the impact this class of problem.
If we can detect that we are running in a distributed environment then in
order to use HBase you MUST have an hbase-site.xml

I'll see if I can make a proof of concept.

Niels

On Wed, Oct 25, 2017 at 11:27 AM, Till Rohrmann 
wrote:

> Hi Niels,
>
> good to see that you solved your problem.
>
> I’m not entirely sure how Pig does it, but I assume that there must be
> some kind of HBase support where the HBase specific files are explicitly
> send to the cluster or that it copies the environment variables. For Flink
> supporting this kind of behaviour is not really feasible because there are
> simply too many potential projects to support out there.
>
> The Flink idiomatic way would be either to read the config on the client,
> put it in the closure of the operator and then send it in serialized form
> to the cluster. Or you set the correct environment variables to start your
> Flink job cluster with by using env.java.opts or extending the class path
> information as you did.
>
> The following code shows the closure approach.
>
> public class Main {
>   private static final Logger LOG = LoggerFactory.getLogger(Main.class);
>
>   public static void main(String[] args) throws Exception {
> printZookeeperConfig();
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
> env.createInput(new HBaseSource(HBaseConfiguration.create())).print();
> env.execute("HBase config problem");
>   }
>
>   public static void printZookeeperConfig() {
> String zookeeper = 
> HBaseConfiguration.create().get("hbase.zookeeper.quorum");
> LOG.info("> Loading HBaseConfiguration: Zookeeper = {}", zookeeper);
>   }
>
>   public static class HBaseSource extends AbstractTableInputFormat {
>
> // HBase configuration read on the client
> private final org.apache.hadoop.conf.Configuration hConf;
>
> public HBaseSource(org.apache.hadoop.conf.Configuration hConf) {
>   this.hConf = Preconditions.checkNotNull(hConf);
> }
>
> @Override
> public void configure(org.apache.flink.configuration.Configuration 
> parameters) {
>   table = createTable();
>   if (table != null) {
> scan = getScanner();
>   }
> }
>
> private HTable createTable() {
>   printZookeeperConfig();
>
>   try {
> return new HTable(hConf, getTableName());
>   } catch (Exception e) {
> LOG.error("Error instantiating a new HTable instance", e);
>   }
>   return null;
> }
>
> @Override
> public String getTableName() {
>   return "bugs:flink";
> }
>
> @Override
> protected String mapResultToOutType(Result result) {
>   return new 
> String(result.getFamilyMap("v".getBytes(UTF_8)).get("column".getBytes(UTF_8)));
> }
>
> @Override
> protected Scan getScanner() {
>   return new Scan();
> }
>   }
> }
>
> Cheers,
> Till
> ​
>
> On Tue, Oct 24, 2017 at 11:51 AM, Niels Basjes  wrote:
>
>> I changed my cluster config (on all nodes) to include the HBase config
>> dir in the classpath.
>> Now everything works as expected.
>>
>> This may very well be a misconfiguration of my cluster.
>> How ever ...
>> My current assesment:
>> Tools like Pig use the HBase config which has been specified on the LOCAL
>> machine. This allows running on a cluster and the HBase is not locally
>> defined.
>> Apparently Flink currently uses the HBase config which has been specified
>> on the REMOTE machine. This limits jobs to ONLY have the HBase that is
>> defined on the cluster.
>>
>> At this point I'm unsure which is the right approach.
>>
>> Niels Basjes
>>
>> On Tue, Oct 24, 2017 at 11:29 AM, Niels Basjes  wrote:
>>
>>> Minor correction: The HBase jar files are on the classpath, just in a
>>> different order.
>>>
>>> On Tue, Oct 24, 2017 at 11:18 AM, Niels Basjes  wrote:
>>>
 I did some more digging.

 I added extra code to print both the environment variables and the
 classpath that is used by the HBaseConfiguration to load the resource 
 files.
 I call this both locally and during startup of the job (i.e. these logs
 arrive in the jobmanager.log on the cluster)

 Summary of that I found locally:

 Environment
 2017-10-24 08:50:15,612 INFO  com.bol.bugreports.Main
  - HADOOP_CONF_DIR = /etc/hadoop/conf/
 2017-10-24 08:50:15,613 INFO  com.bol.bugreports.Main
  - HBASE_CONF_DIR = /etc/hbase/conf/
 2017-10-24 08:50:15,613 INFO  com.bol.bugreports.Main
  - FLINK_CONF_DIR = /usr/local/flink-1.3.2/conf
 2017-10-24 08:50:15,613 INFO  com.bol.bugreports.Main
  - HIVE_CONF_DIR = /etc/hive/conf/
 2017-10-24 08:50:15,613 INFO  com.bol.bugreports.Main
  - YARN_CONF_DIR = /etc/hadoop/conf/

 ClassPath
 2017-10-24 08:50:15,614 INFO  com.bol

Re: StreamTransformation object

2017-10-26 Thread AndreaKinn
Sorry Tony it is my fault, I was wrong the first post. Actually now my
situation is the following:


DataStream>
LCxAccResult = HTM.learn(LCxAccStream, new Harness.AnomalyNetwork())
.select(new 
InferenceSelectFunction>() {...}


so actually the return value of "Learn" is a HTMStream object and the return
value of "Select" is a DataStream where I need to implement slotSharingGroup
on Learn. So I think I can't set SingleOutputStreamOperator as return value
of learn, I believe (I hope not since I have not a clue how to do it :D) I
need to define slotSharingGroup directly in HTMStream class, as in the first
post. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Use a round-robin kafka partitioner

2017-10-26 Thread kla
Thanks for your comment.
If I write the KafkaPartitioner anyway I have to somehow pass the
*kafka.producer.Partitioner* which is not so easy.

So I have found the easiest solution for this, you have just pass /null/
value:

outputStream.addSink(new
FlinkKafkaProducer010<>(producerProperties.getProperty(TOPIC),
 new
EventSerializationSchema(),
 producerProperties,
null));

Which means that *FlinkKafkaProducer* will automatically use the Kafka's
default partitioner.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Local combiner on each mapper in Flink

2017-10-26 Thread Fabian Hueske
Hi,

in a MapReduce context, combiners are used to reduce the amount of data 1)
to shuffle and fully sort (to group the data by key) and 2) to reduce the
impact of skewed data.

The question is, why do you need a combiner in your use case.
- To reduce the data to shuffle: You should not use a window operator to
preaggregate because keyBy implies a shuffle. Instead you could implement a
ProcessFunction with operator state. In this solution you need to implement
the windowing logic yourself, i.e., group data in window based on their
timestamp. Ensure you don't run out of memory (operator state is kept on
the heap), etc. So this solution needs quite a bit of manual tuning.
- To reduce the impact of skewed data: You can use a window aggregation if
you don't mind the shuffle. However, you should add an additional
artificial key attribute to spread out the computation of the same original
key to more grouping key. Note that Flink assigns grouping keys by hash
partitioning to workers. This works well for many distinct keys, but might
cause issues in case of low key cardinality. Also note that the state size
grows and effectiveness reduces with an increasing cardinality of the
artificial key.

Hope this helps,
Fabian

2017-10-26 3:32 GMT+02:00 Kurt Young :

> Do you mean you want to keep the origin window as well as doing some
> combine operations inside window in the same time?
> What kind of data do you expect the following operator will receive?
>
> Best,
> Kurt
>
> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu  wrote:
>
>> Thank Kurt I'm trying out WindowedStream aggregate right now. Just
>> wondering, is there any way for me to preserve the window after
>> aggregation. More specifically, originally i have something like:
>>
>> WindowedStream, Tuple, TimeWindow> windowStream =
>> dataStream
>> .keyBy(0) //id
>> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>>
>> and then for the reducer I can do:
>>
>> windowStream.apply(...)
>>
>> and expect the window information is preserved.
>>
>> If I were to do use aggregate on window stream, I would end up with
>> something like:
>>
>> DataStream> windowStream = dataStream
>> .keyBy(0) //id
>> .timeWindow(Time.of(windowSize,
>> TimeUnit.MILLISECONDS)).aggregate
>> (new AggregateFunction, Accumulator, Tuple2> Long>>() {
>> @Override
>> public Accumulator createAccumulator() {
>> return null;
>> }
>>
>> @Override
>> public void add(Tuple2 stringLong,
>> Accumulator o) {
>>
>> }
>>
>> @Override
>> public Tuple2 getResult(Accumulator o) {
>> return null;
>> }
>>
>> @Override
>> public Accumulator merge(Accumulator o, Accumulator
>> acc1) {
>> return null;
>> }
>> });
>>
>> Because it looks like aggregate would only transfer WindowedStream to a
>> DataStream. But for a global aggregation phase (a reducer), should I
>> extract the window again?
>>
>>
>> Thanks! I apologize if that sounds like a very intuitive questions.
>>
>>
>> Le
>>
>>
>>
>>
>>
>>
>> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young  wrote:
>>
>>> I think you can use WindowedStream.aggreate
>>>
>>> Best,
>>> Kurt
>>>
>>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu  wrote:
>>>
 Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has
 implementation of combiner in DataStream (to use after keyBy and 
 windowing).

 Thanks again!

 Le

 On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young  wrote:

> Hi,
>
> The document you are looking at is pretty old, you can check the
> newest version here: https://ci.apache.org/pr
> ojects/flink/flink-docs-release-1.3/dev/batch/dataset_transf
> ormations.html
>
> Regarding to your question, you can use combineGroup
>
> Best,
> Kurt
>
> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu  wrote:
>
>> Hello!
>>
>> I'm new to Flink and I'm wondering if there is a explicit local
>> combiner to each mapper so I can use to perform a local reduce on each
>> mapper? I looked up on https://ci.apache.org/proje
>> cts/flink/flink-docs-release-0.8/dataset_transformations.html but
>> couldn't find anything that matches.
>>
>>
>> Thanks!
>>
>> Le
>>
>
>

>>>
>>
>


Re: State snapshotting when source is finite

2017-10-26 Thread Fabian Hueske
Hi Flavio,

Thanks for bringing up this topic.
I think running periodic jobs with state that gets restored and persisted
in a savepoint is a very valid use case and would fit the stream is a
superset of batch story quite well.
I'm not sure if this behavior is already supported, but think this would be
a desirable feature.

I'm looping in Till and Aljoscha who might have some thoughts on this as
well.
Depending on the discussion we should open a JIRA for this feature.

Cheers, Fabian

2017-10-25 10:31 GMT+02:00 Flavio Pompermaier :

> Hi to all,
> in my current use case I'd like to improve one step of our batch pipeline.
> There's one specific job that ingest a tabular dataset (of Rows) and
> explode it into a set of RDF statements (as Tuples).  The objects we output
> are a containers of those Tuples (grouped by a field).
> Flink stateful streaming could be a perfect fit here because we
> incrementally increase the state of those containers but we don't have to
> spend a lot of time performing some GET operation to an external Key-value
> store.
> The big problem here is that the sources are finite and the state of the
> job gets lost once the job ends, while I was expecting that Flink was
> snapshotting the state of its operators before exiting.
>
> This idea was inspired by https://data-artisans.com/
> blog/queryable-state-use-case-demo#no-external-store, whit the difference
> that one can resume the state of the stateful application only when
> required.
> Do you think that it could be possible to support such a use case (that we
> can summarize as "periodic batch jobs that pick up where they left")?
>
> Best,
> Flavio
>