Hi Ali, sorry for the delayed response.
Regarding your question: This is a bit tricky but doable. I assume you have CSV records with ID's 1 - 1000 and you run your stuff with a parallelism of 2. The Rich* variants of the user defined functions allow you to access a runtime context. This context gives you the subtaskID of your current task and the total number of running tasks. With that information, you can let let subtaskID=0 handle the IDs from 1 - 499 and subtaskID=1 handle 500 - 1000 Next, you need to send the elements form your stream to the the right subtaskID. Therefore, you can use a custom partitioner for a data stream. The partitioner decides for each element to which subtask ID it goes. You can determine the subtask id based on the ID in your data. This is a small snipped with the required code: DataStream<SimpleEntity> stream = dataStream.partitionCustom(new Partitioner<SimpleEntity>() { @Override public int partition(SimpleEntity simpleEntity, int i) { return 0; // return the appropriate ID here } }, "id"); stream.flatMap(new RichFlatMapFunction<SimpleEntity, String>() { @Override public void flatMap(SimpleEntity simpleEntity, Collector<String> collector) throws Exception { int howMany = getRuntimeContext().getNumberOfParallelSubtasks(); int myId = getRuntimeContext().getIndexOfThisSubtask(); /// } }); Let me know if you more help, Regards, Robert On Mon, Nov 16, 2015 at 5:44 PM, Kashmar, Ali <ali.kash...@emc.com> wrote: > Hi Robert, > > Thanks for the help! I’ve managed to implement my use case using your > suggested approach of combining the streams. > > Just a follow up on 2b) below, I’m not clear on this statement "partition > (split) the data stream so that the right protocol packets end up at the > right machine”. How do I know which machine the data is ending up at? My > understanding is that the Flink program is agnostic of the cluster nodes. > > Maybe it would help if I explained this use case: > 1. Load a CSV file and split it equally, using the ID in the CSV record, > across the Flink cluster to be stored in memory (operator’s memory > maybe?). This is basically an initialization step. > 2. Once 1) is done, read events from a socket (for now) and use the ID in > the event to add attributes from the matching CSV record to the event. > Store the updated events in a file. > > Based on those two requirements, what can be accomplished using Flink and > what can’t be? Is the stuff that can’t be done in Flink’s roadmap? > > Thanks, > Ali > > > On 2015-11-05, 5:29 PM, "Robert Metzger" <rmetz...@apache.org> wrote: > > >Hi Ali, > > > >1. You can connect two streams and then use the co-map operator to consume > >data from both streams. I'm not sure how much data arrives from one or the > >other stream, but maybe you can store (update) the data in memory. > >Read more here > > > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guid > >e.html#datastream-abstraction > > > >2 a) No, I think all the taskmanager nodes are listening to data. For > >making this highly available, I would recommend to let the system which is > >producing the data write it to Apache Kafka. Then, consume the data from > >Kafka using Flink. > >This way you get very good high availability and througput and you don't > >have to worry about the sockets. > > > >2 b) Sure, you can implement the splitting yourself (each mapper reads N > >lines of the file) and then partition (split) the data stream so that the > >right protocol packets end up at the right machine. > >However, if the entire CSV file fits into the entire memory of one > >machine, > >its probably faster to not split the stream and use each machine to join > >the data locally. > > > >Its really no problem that you're asking questions, that's what the > >mailing > >list is made for. > >I'm looking forward to the next set of questions ;) > > > >Regards, > >Robert > > > > > > > >On Thu, Nov 5, 2015 at 9:56 PM, Kashmar, Ali <ali.kash...@emc.com> wrote: > > > >> Hi Robert, > >> > >> I tried the approach you suggested and it works nicely. Thanks! > >> > >> I have a few more questions if you don’t mind: > >> > >> 1. Is there a way to retrieve in one stream data that's stored in > >>another > >> stream? I have a location stream that I can use to store the latest > >> subscriber location. I have another stream that needs access to the > >>latest > >> subscriber location processed by the location stream. I read a bit on > >> broadcast variables but they’re only available for DataSets, not > >> DataStreams. Did I miss a way in Flink to do this? > >> > >> 2. We are planning to test this on a Flink cluster of 3 nodes (1 master > >> and 2 slaves). > >> > >> a. If I use a socket stream, does each node listen for data on its > >> socket or is it only the job manager node? I assume it’s the latter. > >>This > >> is important because I have to figure out how to make the > >> system highly > >> available. > >> b. Is there a way to split the afore-mentioned CSV file across the > >> three nodes in the cluster? > >> > >> Sorry for bombarding you with questions. > >> > >> Thanks, > >> Ali > >> > >> > >> On 2015-11-05, 10:47 AM, "Robert Metzger" <rmetz...@apache.org> wrote: > >> > >> >Hi Ali, > >> > > >> >great, the start-local-streaming.sh script sounds right. > >> > > >> >I can explain why your first approach didn't work: > >> > > >> >You were trying to send the CSV files from the Flink client to the > >>cluster > >> >using our RPC system (Akka). When you submit a job to Flink, we > >>serialize > >> >all the objects the user created (mappers, sources, ...) and send it to > >> >the > >> >cluster. > >> >There is a method StreamExecutionEnvironment.fromElements(..) which > >>allows > >> >users to serialize a few objects along with the job submission. But the > >> >amount of data you can transfer like this is limited by the Akka frame > >> >size. In our case I think the default is 10 megabytes. > >> >After that, Akka will probably just drop or reject the deployment > >>message. > >> > > >> >I'm pretty sure the approach I've suggested will resolve the issue. > >> > > >> >Please let me know if you need further assistance. > >> > > >> >Regards, > >> >Robert > >> > > >> > > >> > > >> >On Thu, Nov 5, 2015 at 3:39 PM, Kashmar, Ali <ali.kash...@emc.com> > >>wrote: > >> > > >> >> I did not load the CSV file using the approach you suggested. I was > >> >> loading it outside the operators (at the beginning of the main > >>method of > >> >> my class), since the file will be needed by multiple operators for > >>sure. > >> >> When the file was small, I saw the job registered and started, but > >>when > >> >>I > >> >> used a big CSV file, the job never got registered with the task > >>manager > >> >>(I > >> >> tried the ‘list' command and got nothing). > >> >> > >> >> Here’s what I saw with the small(ish) file: > >> >> > >> >> # flink run analytics-flink.jar 19001 minisubs.csv output.csv > >> >> loaded 200000 subscribers from csv file > >> >> 11/02/2015 16:36:59 Job execution switched to status RUNNING. > >> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> > >>Stream > >> >> Sink(1/1) switched to SCHEDULED > >> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> > >>Stream > >> >> Sink(1/1) switched to DEPLOYING > >> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> > >>Stream > >> >> Sink(1/1) switched to RUNNING > >> >> > >> >> > >> >> And here’s what I saw with the big file: > >> >> > >> >> # flink run analytics-flink.jar 19001 subs.csv output.csv > >> >> loaded 1173547 subscribers from csv file > >> >> > >> >> > >> >> I’m already using the streaming mode. I’m running a single Flink node > >> >> right now on Centos 7 using the ‘start-local-streaming.sh’ script. > >> >> > >> >> Thanks, > >> >> Ali > >> >> > >> >> On 2015-11-05, 10:22 AM, "Robert Metzger" <rmetz...@apache.org> > >>wrote: > >> >> > >> >> >Okay. > >> >> > > >> >> >you should be able to implement it as you described initially. I > >>would > >> >>do > >> >> >the transformation in a map() operator of Flink. The RichMapFunction > >> >> >provides you with an open() method which is called before the first > >> >>record > >> >> >arrives. > >> >> >In the open() method, I would read the csv file(s) from HDFS or > >>another > >> >> >file system accessible by all nodes. > >> >> > > >> >> >Then, you can access the data from the files in the map operator. > >> >> > > >> >> >In order to utilize the memory best, I would recommend to start > >>Flink > >> >>in > >> >> >the "streaming" mode. (-st argument on YARN). With that enabled, we > >> >> >provide > >> >> >more memory to streaming operators. > >> >> >Also, I would only expose one processing slot per TaskManager, this > >> >>way we > >> >> >ensure that the files are only read once per TaskManager. (make sure > >> >>you > >> >> >have only one TaskManager per machine). > >> >> > > >> >> >Why did your previous approach fail? Do you still have the error > >> >>message? > >> >> > > >> >> >Regards, > >> >> >Robert > >> >> > > >> >> >On Thu, Nov 5, 2015 at 3:02 PM, Kashmar, Ali <ali.kash...@emc.com> > >> >>wrote: > >> >> > > >> >> >> Hi Robert, > >> >> >> > >> >> >> The CSV file (or files as there will definitely be more than one) > >> >>can be > >> >> >> large (let¹s say 1 GB). Memory is not an issue though. Each node > >>has > >> >>at > >> >> >> least 64 GB RAM mounted. The CSV files should easily fit in the > >> >>memory > >> >> >>of > >> >> >> each node. > >> >> >> > >> >> >> Regards, > >> >> >> Ali > >> >> >> > >> >> >> > >> >> >> > >> >> >> On 2015-11-05, 6:30 AM, "Robert Metzger" <rmetz...@apache.org> > >> wrote: > >> >> >> > >> >> >> >Hi Ali, > >> >> >> > > >> >> >> >I'm excited to hear that EMC is looking into Apache Flink. I > >>think > >> >>the > >> >> >> >solution to this problem depends on one question: What is the > >>size > >> >>of > >> >> >>the > >> >> >> >data in the CSV file compared to the memory you have available in > >> >>the > >> >> >> >cluster? > >> >> >> >Would the mapping table from the file fit into the memory of all > >> >>nodes > >> >> >> >running Flink? > >> >> >> > > >> >> >> >Regards, > >> >> >> >Robert > >> >> >> > > >> >> >> >PS: Did you subscribe to the mailing list? I've CCed you in case > >> >>you're > >> >> >> >not > >> >> >> >subscribed yet > >> >> >> > > >> >> >> >On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali > >><ali.kash...@emc.com> > >> >> >>wrote: > >> >> >> > > >> >> >> >> Hi there, > >> >> >> >> > >> >> >> >> I¹m trying to design and implement a use case in Flink where > >>I¹m > >> >> >> >>receiving > >> >> >> >> protocol packets over a socket. Each packet has the subscriber > >> >>IMSI > >> >> >>in > >> >> >> >>it > >> >> >> >> and a bunch of more data. At the same time, I have a csv file > >> >>with a > >> >> >> >> mapping from IMSI -> subscriber group. I need to inject the > >>group > >> >> >>into > >> >> >> >> packet and then send it to the sink. > >> >> >> >> > >> >> >> >> I¹ve tried loading the CSV into a memory map and then accessing > >> >>the > >> >> >>map > >> >> >> >> from within the Flink operators but that only works when the > >>CSV > >> >>is > >> >> >>very > >> >> >> >> small (a few hundred subscribers). I¹ve tried creating another > >> >>stream > >> >> >> >>for > >> >> >> >> the CSV and connecting the streams but that doesn¹t yield > >>anything > >> >> >>as I > >> >> >> >> can¹t have access to objects from both streams at the same > >>time. > >> >> >> >> > >> >> >> >> How would you guys approach this? > >> >> >> >> > >> >> >> >> Thanks, > >> >> >> >> Ali > >> >> >> >> > >> >> >> > >> >> >> > >> >> > >> > >> > >