Hi Kaan, not entirely sure I understand your solution. I gathered that you create a dataset of TCP addresses and then use flatMap to fetch and output the data?
If so, then I think it's a good solution for batch processing (DataSet). It doesn't work in DataStream because it doesn't play well with checkpointing, but you seem to be interested only in batch. It's also not the first time, I have seen this pattern being used in batch. In general, if it works and is fast enough, it's always a good solution ;). No need to make it more complicated if you can solve it with simpler means and you can maintain it more easily. Best, Arvid On Wed, Apr 29, 2020 at 10:19 PM Kaan Sancak <kaans...@gmail.com> wrote: > Hi Arvid, > > I have implemented a zmq listener class without extending any class of > Flink. > The listener has a constructor with the port number. > > Then in the execution I have created a dateset of string which has the > port numbers. > Then I used a flattop function, which returned Tuple2<Long, Long>. I > opened the tcp sockets using localhost, so matching was quite easy. > > This seemed to work for me. What do you think about this implementation. > Do you see any drawback? > > Best > Kaan > > On Apr 29, 2020, at 7:40 AM, Arvid Heise <ar...@ververica.com> wrote: > > Hi Kaan, > > seems like ZMQ is using TCP and not HTTP. So I guess the easiest way would > be to use a ZMQ Java binding to access it [1]. > > But of course, it's much more complicated to write an iterator logic for > that. Not sure how ZMQ signals the end of such a graph? Maybe it closes the > socket and you can just read as much as possible. > > [1] https://zeromq.org/languages/java/ > > On Tue, Apr 28, 2020 at 10:56 PM Kaan Sancak <kaans...@gmail.com> wrote: > >> Hi Arvid, >> >> I am sorry for the late response. I had some deadlines, but I am back to >> work now. >> I have been trying to implement what we have talked. But I am having >> problems on the implementation. >> I have been using ZMQ to open sockets, because that is inheritenly >> supported in my graph generator. But, I couldn’t make the connection using >> input streams. >> Do you have any specific examples, where I can look and have a better >> idea on how to implement this? >> >> Best >> Kaan >> >> On Apr 24, 2020, at 4:37 AM, Arvid Heise <ar...@ververica.com> wrote: >> >> Hm, I confused sockets to work the other way around (so pulling like >> URLInputStream instead of listening). I'd go by providing the data on a >> port on each generator node. And then read from that in multiple sources. >> >> I think the best solution is to implement a custom InputFormat and then >> use readInput. You could implement a subclass of GenericInputFormat. You >> might even use IteratorInputFormat like this: >> >> private static class URLInputIterator implements Iterator<Tuple2<Long, >> Long>>, Serializable { >> private final URL url; >> private Iterator<Tuple2<Long, Long>> inner; >> >> private URLInputIterator(URL url) { >> this.url = url; >> } >> >> private void readObject(ObjectInputStream in) throws IOException, >> ClassNotFoundException { >> InputStream inputStream = url.openStream(); >> inner = new BufferedReader(new InputStreamReader(inputStream, >> StandardCharsets.UTF_8)) >> .lines() >> .map(line -> { >> String[] parts = line.split(";"); >> return new Tuple2<>(Long.parseLong(parts[0]), >> Long.parseLong(parts[1])); >> }) >> .iterator(); >> } >> >> @Override >> public boolean hasNext() { >> return inner.hasNext(); >> } >> >> @Override >> public Tuple2<Long, Long> next() { >> return inner.next(); >> } >> } >> >> env.fromCollection(new URLInputIterator(new URL("gen_node1", 9999)), >> Types.TUPLE(Types.LONG, Types.LONG)); >> >> >> >> >> On Fri, Apr 24, 2020 at 9:42 AM Kaan Sancak <kaans...@gmail.com> wrote: >> >>> Yes, that sounds like a great idea and actually that's what I am trying >>> to do. >>> >>> Then you configure your analysis job to read from each of these sockets >>> with a separate source and union them before feeding them to the actual job? >>> >>> >>> Before trying to open the sockets on the slave nodes, first I have >>> opened just one socket at master node, and I also run the generator with >>> one node as well. I was able to read the graph, and the run my algorithm >>> without any problems. This was a test run to see whatever I can do it. >>> >>> After, I have opened bunch of sockets on my generators, now I am trying >>> to configure Flink to read from those sockets. However, I am having >>> problems while trying to assign each task manager to a separate socket. I >>> am assuming my problems are related to network binding. In my configuration >>> file, jobmanager.rpc.address is set but I have not done >>> similar configurations for slave nodes. >>> >>> Am I on the right track, or is there an easier way to handle this? >>> >>> I think my point is how to do `read from each of these sockets with a >>> separate source` part. >>> >>> Thanks again >>> >>> Best >>> Kaan >>> >>> >>> >>> On Apr 24, 2020, at 3:11 AM, Arvid Heise <ar...@ververica.com> wrote: >>> >>> Hi Kaan, >>> >>> sorry, I haven't considered I/O as the bottleneck. I thought a bit more >>> about your issue and came to a rather simple solution. >>> >>> How about you open a socket on each of your generator nodes? Then you >>> configure your analysis job to read from each of these sockets with a >>> separate source and union them before feeding them to the actual job? >>> >>> You don't need to modify much on the analysis job and each source can be >>> independently read. WDYT? >>> >>> On Fri, Apr 24, 2020 at 8:46 AM Kaan Sancak <kaans...@gmail.com> wrote: >>> >>>> Thanks for the answer! Also thanks for raising some concerns about my >>>> question. >>>> >>>> Some of the graphs I have been using is larger than 1.5 tb, and I am >>>> currently an experiment stage of a project, and I am making modifications >>>> to my code and re-runing the experiments again. Currently, on some of the >>>> largest graphs I have been using, IO became an issue for me and keeps me >>>> wait for couple of hours. >>>> >>>> Moreover, I have a parallel/distributed graph generator, which I can >>>> run on the same set of nodes in my cluster. So what I wanted to do was, to >>>> run my Flink program and graph generator at the same time and feed the >>>> graph through generator, which should be faster than making IO from the >>>> disk. As you said, it is not essential for me to that, but I am trying to >>>> see what I am able to do using Flink and how can I solve such problems. I >>>> was also using another framework, and faced with the similar problem, I was >>>> able to reduce the graph read time from hours to minutes using this method. >>>> >>>> Do you really have more main memory than disk space? >>>> >>>> >>>> My issue is actually not storage related, I am trying to see how can I >>>> reduce the IO time. >>>> >>>> One trick came to my mind is, creating dummy dataset, and using a map >>>> function on the dataset, I can open-up bunch of sockets and listen the >>>> generator, and collect the generated data. I am trying to see how it will >>>> turn out. >>>> >>>> Alternatively, if graph generation is rather cheap, you could also try >>>> to incorporate it directly into the analysis job. >>>> >>>> >>>> I am not familiar with the analysis jobs. I will look into it. >>>> >>>> Again, this is actually not a problem, I am just trying to experiment >>>> with the framework and see what I can do. I am very new to Flink, so my >>>> methods might be wrong. Thanks for the help! >>>> >>>> Best >>>> Kaan >>>> >>>> >>>> On Apr 23, 2020, at 10:51 AM, Arvid Heise <ar...@ververica.com> wrote: >>>> >>>> Hi Kaan, >>>> >>>> afaik there is no (easy) way to switch from streaming back to batch API >>>> while retaining all data in memory (correct me if I misunderstood). >>>> >>>> However, from your description, I also have some severe understanding >>>> problems. Why can't you dump the data to some file? Do you really have more >>>> main memory than disk space? Or do you have no shared memory between your >>>> generating cluster and the flink cluster? >>>> >>>> It almost sounds as if the issue at heart is rather to find a good >>>> serialization format on how to store the edges. The 70 billion edges could >>>> be stored in an array of id pairs, which amount to ~560 GB uncompressed >>>> data if stored in Avro (or any other binary serialization format) when ids >>>> are longs. That's not much by today's standards and could also be easily >>>> offloaded to S3. >>>> >>>> Alternatively, if graph generation is rather cheap, you could also try >>>> to incorporate it directly into the analysis job. >>>> >>>> On Wed, Apr 22, 2020 at 2:58 AM Kaan Sancak <kaans...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I have been running some experiments on large graph data, smallest >>>>> graph I have been using is around ~70 billion edges. I have a graph >>>>> generator, which generates the graph in parallel and feeds to the running >>>>> system. However, it takes a lot of time to read the edges, because even >>>>> though the graph generation process is parallel, in Flink I can only >>>>> listen >>>>> from master node (correct me if I am wrong). Another option is dumping the >>>>> generated data to a file and reading with readFromCsv, however this is not >>>>> feasible in terms of storage management. >>>>> >>>>> What I want to do is, invoking my graph generator, using ipc/tcp >>>>> protocols and reading the generated data from the sockets. Since the >>>>> graph >>>>> data is also generated parallel in each node, I want to make use of ipc, >>>>> and read the data in parallel at each node. I made some online digging >>>>> but >>>>> couldn’t find something similar using dataset api. I would be glad if you >>>>> have some similar use cases or examples. >>>>> >>>>> Is it possible to use streaming environment to create the data in >>>>> parallel and switch to dataset api? >>>>> >>>>> Thanks in advance! >>>>> >>>>> Best >>>>> Kaan >>>> >>>> >>>> >>>> -- >>>> Arvid Heise | Senior Java Developer >>>> <https://www.ververica.com/> >>>> >>>> Follow us @VervericaData >>>> -- >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>> Conference >>>> Stream Processing | Event Driven | Real Time >>>> -- >>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> -- >>>> Ververica GmbH >>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>>> (Toni) Cheng >>>> >>>> >>>> >>> >>> -- >>> Arvid Heise | Senior Java Developer >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> -- >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> Stream Processing | Event Driven | Real Time >>> -- >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >>> >>> >> >> -- >> Arvid Heise | Senior Java Developer >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> -- >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> Stream Processing | Event Driven | Real Time >> -- >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> >> >> > > -- > Arvid Heise | Senior Java Developer > <https://www.ververica.com/> > > Follow us @VervericaData > -- > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > Stream Processing | Event Driven | Real Time > -- > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > > > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng