Re: Is there a way to change the port for the Flink monitoring page?

2015-09-17 Thread Chiwan Park
Hi Felix,

You can change the listening port of jobmanager web frontend by setting 
`jobmanager.web.port` in configuration (conf/flink-conf.yml).
I attached a link of documentation [1] about this.

Regards,
Chiwan Park

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/config.html#jobmanager-web-frontend


> On Sep 17, 2015, at 8:31 AM, Felix Cheung  wrote:
> 
> 
> I'm  using Flink from Zeppelin in local mode and Zeppelin is using 8080 
> 
> 
> 
> 





Re: Kinesis Connector

2015-09-17 Thread Márton Balassi
Hi Giancarlo,

I have no knowledge of someone working on such a project. However it would
be a valuable contribution, if you were to start the effort please keep us
notified, I would also suggest to file a JIRA ticket for it.

Best,

Marton

On Thu, Sep 17, 2015 at 12:54 PM, Giancarlo Pagano 
wrote:

> Hi,
>
> Is there any project already working on a Kinesis connector for Flink or
> any plan to add a Kinesis connector to the main Flink distribution in the
> future?
>
> Thanks,
> Giancarlo


Re: Kinesis Connector

2015-09-17 Thread Stephan Ewen
Hi Giancarlo!

I am not aware of any existing Kinesis connector. Would be definitely
something to put onto the roadmap for the near future. This is a stream
source we should support similarly to Kafka.

I am not super familiar with Kinesis, but it looks a bit like offering a
similar abstraction as Kafka, especially with the ability to read the
streams from specific positions. That way, it should be possible to follow
the same design as the Kafka connector (even simpler, if they don't have
the tricky offset committing part of Kafka).

Greetings,
Stephan


On Thu, Sep 17, 2015 at 12:54 PM, Giancarlo Pagano 
wrote:

> Hi,
>
> Is there any project already working on a Kinesis connector for Flink or
> any plan to add a Kinesis connector to the main Flink distribution in the
> future?
>
> Thanks,
> Giancarlo


Kinesis Connector

2015-09-17 Thread Giancarlo Pagano
Hi,

Is there any project already working on a Kinesis connector for Flink or any 
plan to add a Kinesis connector to the main Flink distribution in the future?

Thanks,
Giancarlo

Joining Windowed Data Streams

2015-09-17 Thread Philipp Goetze

Hey community,

is there a possibility to join two windowed data streams, instead of 
joining two data streams on a window?


For example if one wants to implement Q1 of SRBench 
 you load the data, create one window 
definition and then one would combine filters and joins on this window. 
But how would you do these Joins in Flink Streaming?


Best Regards,
Philipp


Re: Distribute DataSet to subset of nodes

2015-09-17 Thread Fabian Hueske
Hi Stefan,

I think I have a solution for your problem :-)

1) Distribute both parts of the small data to each machine (you have done
that)
2) Your mapper should have a parallelism of 10, the tasks with ID 0 to 4
(get ID via RichFunction.getRuntimeContext().getIndexOfThisSubtask()) read
the first half, tasks 5 to 9 read the second half.
3) Give the large input into a FlatMapper which sends out two records for
each incoming record and assigns the first outgoing record a task ID in
range 0 to 4 and the second outgoing record an ID in range 5 to 9.
4) Have a custom partitioner (DataSet.partitionCustom()) after the
duplicating mapper, which partitions the records based on the assigned task
Id before they go into the mapper with the other smaller data set. A record
with assigned task ID 0 will be sent to the mapper task with subtask index
0.

This setup is not very nice, but should work.

Let me know, if you need more detail.

Cheers, Fabian

2015-09-16 21:44 GMT+02:00 Stefan Bunk :

> Hi Fabian,
>
> the local file problem would however not exist, if I just copy both halves
> to all nodes, right?
>
> Lets say I have a file `1st` and a file `2nd`, which I copy to all nodes.
> Now with your approach from above, I do:
>
> // helper broadcast datasets to know on which half to operate
> val data1stHalf = env.fromCollection("1st")
> val data2ndHalf = env.fromCollection("2nd")
>
> val mapped1 = data.flatMap(yourMap).withBroadcastSet(data1stHalf,
> "fileName").setParallelism(5)
> val mapped2 = data.flatMap(yourMap).withBroadcastSet(data2ndHalf,
> "fileName").setParallelism(5)
> DataSet result = mapped1.union(mapped2)
>
> Then, in my custom operator implementation of flatMap I check the helper
> broadcast data to know which file to load:
> override def open(params: Configuration): Unit = {
> val fileName =
> getRuntimeContext.getBroadcastVariable[String]("fileName")(0)
> // read the file from the local filesystem which I copied there earlier
> this.data = loadFromFileIntoDatastructure("/home/data/" + fileName)
> }
> override def flatMap(document: Input, out: Collector[Output]): Unit = {
> // do sth. with this.data and the input
> out.collect(this.data.process(input))
> }
>
> I think this should work, or do you see another problem here?
>
> Which brings us to the other question:
> The both halves are so large, that one half of the data fits in the
> user-remaining memory on a node, but not both halves. So my program would
> probably memory-crash, if the scheduling trusts one node so much, that it
> wants to execute two flatMaps there ;-).
>
> You are saying, that it is not guaranteed, that all 10 nodes are used, but
> how likely is it, that one node is given two flatMaps and another one is
> basically idling? I have no idea of the internals, but I guess there is
> some heuristic inside which decides how to distribute.In the normal setup
> that all 10 nodes are up, connection is good, all nodes have the same
> resources available, input data is evenly distributed in HDFS, then the
> default case should be to distribute to all 10 nodes, right?
>
> I am not running in production, so for me it would be ok, if this works
> out usually.
>
> Cheers
> Stefan
>
>
> On 15 September 2015 at 23:40, Fabian Hueske  wrote:
>
>> Hi Stefan,
>>
>> the problem is that you cannot directly influence the scheduling of tasks
>> to nodes to ensure that you can read the data that you put in the local
>> filesystems of your nodes. HDFS gives a shared file system which means that
>> each node can read data from anywhere in the cluster.
>> I assumed the data is small enough to broadcast because you want to keep
>> it in memory.
>>
>> Regarding your question. It is not guaranteed that two different tasks,
>> each with parallelism 5, will be distributed to all 10 nodes (even if you
>> have only 10 processing slots).
>> What would work is to have one map task with parallelism 10 and a Flink
>> setup with 10 task managers on 10 machines with only one processing slot
>> per TM. However, you won't be able to replicate the data to both sets of
>> maps because you cannot know which task instance will be executed on which
>> machine (you cannot distinguish the tasks of both task sets).
>>
>> As I said, reading from local file system in a cluster and forcing task
>> scheduling to specific nodes is quite tricky.
>> Cheers, Fabian
>>
>> 2015-09-15 23:15 GMT+02:00 Stefan Bunk :
>>
>>> Hi Fabian,
>>>
>>> I think we might have a misunderstanding here. I have already copied the
>>> first file to five nodes, and the second file to five other nodes, outside
>>> of Flink. In the open() method of the operator, I just read that file via
>>> normal Java means. I do not see, why this is tricky or how HDFS should help
>>> here.
>>> Then, I have a normal Flink DataSet, which I want to run through the
>>> operator (using the previously read data in the flatMap implementation). As
>>> I run the