Hi Ali,

sorry for the delayed response.

Regarding your question:

This is a bit tricky but doable.
I assume you have CSV records with ID's 1 - 1000 and you run your stuff
with a parallelism of 2.

The Rich* variants of the user defined functions allow you to access a
runtime context. This context gives you the subtaskID of your current task
and the total number of running tasks.
With that information, you can let let subtaskID=0 handle the IDs from 1 -
499 and subtaskID=1 handle 500 - 1000
Next, you need to send the elements form your stream to the the right
subtaskID.  Therefore, you can use a custom partitioner for a data stream.
The partitioner decides for each element to which subtask ID it goes. You
can determine the subtask id based on the ID in your data.

This is a small snipped with the required code:

DataStream<SimpleEntity> stream = dataStream.partitionCustom(new
Partitioner<SimpleEntity>() {
   @Override
   public int partition(SimpleEntity simpleEntity, int i) {
      return 0; // return the appropriate ID here
   }
}, "id");
stream.flatMap(new RichFlatMapFunction<SimpleEntity, String>() {
   @Override
   public void flatMap(SimpleEntity simpleEntity, Collector<String>
collector) throws Exception {
      int howMany = getRuntimeContext().getNumberOfParallelSubtasks();
      int myId = getRuntimeContext().getIndexOfThisSubtask();
      ///
   }
});


Let me know if you more help,

Regards,
Robert

On Mon, Nov 16, 2015 at 5:44 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:

> Hi Robert,
>
> Thanks for the help! I’ve managed to implement my use case using your
> suggested approach of combining the streams.
>
> Just a follow up on 2b) below, I’m not clear on this statement "partition
> (split) the data stream so that the right protocol packets end up at the
> right machine”. How do I know which machine the data is ending up at? My
> understanding is that the Flink program is agnostic of the cluster nodes.
>
> Maybe it would help if I explained this use case:
> 1. Load a CSV file and split it equally, using the ID in the CSV record,
> across the Flink cluster to be stored in memory (operator’s memory
> maybe?). This is basically an initialization step.
> 2. Once 1) is done, read events from a socket (for now) and use the ID in
> the event to add attributes from the matching CSV record to the event.
> Store the updated events in a file.
>
> Based on those two requirements, what can be accomplished using Flink and
> what can’t be? Is the stuff that can’t be done in Flink’s roadmap?
>
> Thanks,
> Ali
>
>
> On 2015-11-05, 5:29 PM, "Robert Metzger" <rmetz...@apache.org> wrote:
>
> >Hi Ali,
> >
> >1. You can connect two streams and then use the co-map operator to consume
> >data from both streams. I'm not sure how much data arrives from one or the
> >other stream, but maybe you can store (update) the data in memory.
> >Read more here
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guid
> >e.html#datastream-abstraction
> >
> >2 a) No, I think all the taskmanager nodes are listening to data. For
> >making this highly available, I would recommend to let the system which is
> >producing the data write it to Apache Kafka. Then, consume the data from
> >Kafka using Flink.
> >This way you get very good high availability and througput and you don't
> >have to worry about the sockets.
> >
> >2 b) Sure, you can implement the splitting yourself (each mapper reads N
> >lines of the file) and then partition (split) the data stream so that the
> >right protocol packets end up at the right machine.
> >However, if the entire CSV file fits into the entire memory of one
> >machine,
> >its probably faster to not split the stream and use each machine to join
> >the data locally.
> >
> >Its really no problem that you're asking questions, that's what the
> >mailing
> >list is made for.
> >I'm looking forward to the next set of questions ;)
> >
> >Regards,
> >Robert
> >
> >
> >
> >On Thu, Nov 5, 2015 at 9:56 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
> >
> >> Hi Robert,
> >>
> >> I tried the approach you suggested and it works nicely. Thanks!
> >>
> >> I have a few more questions if you don’t mind:
> >>
> >> 1. Is there a way to retrieve in one stream data that's stored in
> >>another
> >> stream? I have a location stream that I can use to store the latest
> >> subscriber location. I have another stream that needs access to the
> >>latest
> >> subscriber location processed by the location stream. I read a bit on
> >> broadcast variables but they’re only available for DataSets, not
> >> DataStreams. Did I miss a way in Flink to do this?
> >>
> >> 2. We are planning to test this on a Flink cluster of 3 nodes (1 master
> >> and 2 slaves).
> >>
> >>    a. If I use a socket stream, does each node listen for data on its
> >> socket or is it only the job manager node? I assume it’s the latter.
> >>This
> >> is               important because I have to figure out how to make the
> >> system highly
> >> available.
> >>    b. Is there a way to split the afore-mentioned CSV file across the
> >> three nodes in the cluster?
> >>
> >> Sorry for bombarding you with questions.
> >>
> >> Thanks,
> >> Ali
> >>
> >>
> >> On 2015-11-05, 10:47 AM, "Robert Metzger" <rmetz...@apache.org> wrote:
> >>
> >> >Hi Ali,
> >> >
> >> >great, the start-local-streaming.sh script sounds right.
> >> >
> >> >I can explain why your first approach didn't work:
> >> >
> >> >You were trying to send the CSV files from the Flink client to the
> >>cluster
> >> >using our RPC system (Akka). When you submit a job to Flink, we
> >>serialize
> >> >all the objects the user created (mappers, sources, ...) and send it to
> >> >the
> >> >cluster.
> >> >There is a method StreamExecutionEnvironment.fromElements(..) which
> >>allows
> >> >users to serialize a few objects along with the job submission. But the
> >> >amount of data you can transfer like this is limited by the Akka frame
> >> >size. In our case I think the default is 10 megabytes.
> >> >After that, Akka will probably just drop or reject the deployment
> >>message.
> >> >
> >> >I'm pretty sure the approach I've suggested will resolve the issue.
> >> >
> >> >Please let me know if you need further assistance.
> >> >
> >> >Regards,
> >> >Robert
> >> >
> >> >
> >> >
> >> >On Thu, Nov 5, 2015 at 3:39 PM, Kashmar, Ali <ali.kash...@emc.com>
> >>wrote:
> >> >
> >> >> I did not load the CSV file using the approach you suggested. I was
> >> >> loading it outside the operators (at the beginning of the main
> >>method of
> >> >> my class), since the file will be needed by multiple operators for
> >>sure.
> >> >> When the file was small, I saw the job registered and started, but
> >>when
> >> >>I
> >> >> used a big CSV file, the job never got registered with the task
> >>manager
> >> >>(I
> >> >> tried the ‘list' command and got nothing).
> >> >>
> >> >> Here’s what I saw with the small(ish) file:
> >> >>
> >> >> # flink run analytics-flink.jar 19001 minisubs.csv output.csv
> >> >> loaded 200000 subscribers from csv file
> >> >> 11/02/2015 16:36:59 Job execution switched to status RUNNING.
> >> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map ->
> >>Stream
> >> >> Sink(1/1) switched to SCHEDULED
> >> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map ->
> >>Stream
> >> >> Sink(1/1) switched to DEPLOYING
> >> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map ->
> >>Stream
> >> >> Sink(1/1) switched to RUNNING
> >> >>
> >> >>
> >> >> And here’s what I saw with the big file:
> >> >>
> >> >> # flink run analytics-flink.jar 19001 subs.csv output.csv
> >> >> loaded 1173547 subscribers from csv file
> >> >>
> >> >>
> >> >> I’m already using the streaming mode. I’m running a single Flink node
> >> >> right now on Centos 7 using the ‘start-local-streaming.sh’ script.
> >> >>
> >> >> Thanks,
> >> >> Ali
> >> >>
> >> >> On 2015-11-05, 10:22 AM, "Robert Metzger" <rmetz...@apache.org>
> >>wrote:
> >> >>
> >> >> >Okay.
> >> >> >
> >> >> >you should be able to implement it as you described initially. I
> >>would
> >> >>do
> >> >> >the transformation in a map() operator of Flink. The RichMapFunction
> >> >> >provides you with an open() method which is called before the first
> >> >>record
> >> >> >arrives.
> >> >> >In the open() method, I would read the csv file(s) from HDFS or
> >>another
> >> >> >file system accessible by all nodes.
> >> >> >
> >> >> >Then, you can access the data from the files in the map operator.
> >> >> >
> >> >> >In order to utilize the memory best, I would recommend to start
> >>Flink
> >> >>in
> >> >> >the "streaming" mode. (-st argument on YARN). With that enabled, we
> >> >> >provide
> >> >> >more memory to streaming operators.
> >> >> >Also, I would only expose one processing slot per TaskManager, this
> >> >>way we
> >> >> >ensure that the files are only read once per TaskManager. (make sure
> >> >>you
> >> >> >have only one TaskManager per machine).
> >> >> >
> >> >> >Why did your previous approach fail? Do you still have the error
> >> >>message?
> >> >> >
> >> >> >Regards,
> >> >> >Robert
> >> >> >
> >> >> >On Thu, Nov 5, 2015 at 3:02 PM, Kashmar, Ali <ali.kash...@emc.com>
> >> >>wrote:
> >> >> >
> >> >> >> Hi Robert,
> >> >> >>
> >> >> >> The CSV file (or files as there will definitely be more than one)
> >> >>can be
> >> >> >> large (let¹s say 1 GB). Memory is not an issue though. Each node
> >>has
> >> >>at
> >> >> >> least 64 GB RAM mounted. The CSV files should easily fit in the
> >> >>memory
> >> >> >>of
> >> >> >> each node.
> >> >> >>
> >> >> >> Regards,
> >> >> >> Ali
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> On 2015-11-05, 6:30 AM, "Robert Metzger" <rmetz...@apache.org>
> >> wrote:
> >> >> >>
> >> >> >> >Hi Ali,
> >> >> >> >
> >> >> >> >I'm excited to hear that EMC is looking into Apache Flink. I
> >>think
> >> >>the
> >> >> >> >solution to this problem depends on one question: What is the
> >>size
> >> >>of
> >> >> >>the
> >> >> >> >data in the CSV file compared to the memory you have available in
> >> >>the
> >> >> >> >cluster?
> >> >> >> >Would the mapping table from the file fit into the memory of all
> >> >>nodes
> >> >> >> >running Flink?
> >> >> >> >
> >> >> >> >Regards,
> >> >> >> >Robert
> >> >> >> >
> >> >> >> >PS: Did you subscribe to the mailing list? I've CCed you in case
> >> >>you're
> >> >> >> >not
> >> >> >> >subscribed yet
> >> >> >> >
> >> >> >> >On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali
> >><ali.kash...@emc.com>
> >> >> >>wrote:
> >> >> >> >
> >> >> >> >> Hi there,
> >> >> >> >>
> >> >> >> >> I¹m trying to design and implement a use case in Flink where
> >>I¹m
> >> >> >> >>receiving
> >> >> >> >> protocol packets over a socket. Each packet has the subscriber
> >> >>IMSI
> >> >> >>in
> >> >> >> >>it
> >> >> >> >> and a bunch of more data. At the same time, I have a csv file
> >> >>with a
> >> >> >> >> mapping from IMSI -> subscriber group. I need to inject the
> >>group
> >> >> >>into
> >> >> >> >> packet and then send it to the sink.
> >> >> >> >>
> >> >> >> >> I¹ve tried loading the CSV into a memory map and then accessing
> >> >>the
> >> >> >>map
> >> >> >> >> from within the Flink operators but that only works when the
> >>CSV
> >> >>is
> >> >> >>very
> >> >> >> >> small (a few hundred subscribers). I¹ve tried creating another
> >> >>stream
> >> >> >> >>for
> >> >> >> >> the CSV and connecting the streams but that doesn¹t yield
> >>anything
> >> >> >>as I
> >> >> >> >> can¹t have access to objects from both streams at the same
> >>time.
> >> >> >> >>
> >> >> >> >> How would you guys approach this?
> >> >> >> >>
> >> >> >> >> Thanks,
> >> >> >> >> Ali
> >> >> >> >>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>
>
>

Reply via email to