You can give it a Map<K,V> also, but that will not be substantially faster. Because the data streamer routes by affinity key, has to process each record, and route it to the buffer for the node with that key. The data streamer will collect 8192 records on the client node for each server node before sending. If you write a custom StreamReceiver, what you get on the receiving side is a Map with 8192 elements. Note that map contains records whose affinity keys for the node, and it is not broken up by partition.
Considerations: - use a separate DataStreamer per client thread, otherwise the flush gets complicated. - If you use a custom StreamReceiver, the stream receiver is serialized once when the stream is set up, but the serialized form is sent in every buffer. Put it in its own class and limit the amount of data that will be serialized with it. I actually had an IgniteCache reference in the class, which cause the instance to serialize to 100s of KB, which wasd getting sent each time. - The default for peer class loading is SHARED. If you change the StreamReceiver code, you have to shut down all of the "master" (i.e.,, client) nodes to get it to replace the class, apparently even if the other clients did not load that specific class. -DH On Wed, Mar 7, 2018 at 8:48 AM, Naveen <[email protected]> wrote: > HI > > We are using Ignite 2.3 > We have requirement to migrate the data from existing in-memory solution to > Ignite, its one time migration we should be doing > > We have data available in CSV with a delimiter, we have split for the > source > files into multiple chunks and each thread processing one file. > Here is the code which read the file line by line and call dataStreamer.add > method. > > > while (sc.hasNextLine()) { > ct++; > String line = sc.nextLine(); > //System.out.println(line); > String[] tokens = line.split(Constants.Delimter, > -1); > //System.out.println("No of tokens " + > toekns.length); > //PARTY_ID~ASSOCIATED_PARTY_ > ID~UPDATEDDATETIME~UPDATEDBY > aASSOCIATED_PARTIES = new ASSOCIATED_PARTIES(); > aASSOCIATED_PARTIES.setPARTY_ID(tokens[0]); > aASSOCIATED_PARTIES.setASSOCIATED_PARTY_ID(tokens[ > 1]); > aASSOCIATED_PARTIES.setUPDATEDBY(tokens[3]); > aASSOCIATED_PARTIES.setUPDATEDDATETIME(new > Timestamp(System.currentTimeMillis())); > > streamer.perNodeBufferSize( > Constants.perNodeBufferSize); > > streamer.perNodeParallelOperations(Constants.perNodeParallelOperations); > > streamer.addData(tokens[0], aASSOCIATED_PARTIES); > if (ct > 0 && ct % 10000 == 0) > System.out.println("Done: " + ct); > } > > My question is, how do I call streamer.addData(tokens[0], > aASSOCIATED_PARTIES) for a batch instead of calling for every record. Guess > the way I have used is calling this for every line thats read from the > file. > It should be called for lets say every 1000 records once, start > accumulating > all the 1000 records and add it to cache 1000 records in batch. > How could we do this, batch wise or time wise, like every minute it should > write. > > Thanks > Naveen > > > > -- > Sent from: http://apache-ignite-users.70518.x6.nabble.com/ > Disclaimer The information contained in this communication from the sender is confidential. It is intended solely for use by the recipient and others authorized to receive it. If you are not the recipient, you are hereby notified that any disclosure, copying, distribution or taking action in relation of the contents of this information is strictly prohibited and may be unlawful. This email has been scanned for viruses and malware, and may have been automatically archived by Mimecast Ltd, an innovator in Software as a Service (SaaS) for business. Providing a safer and more useful place for your human generated data. Specializing in; Security, archiving and compliance. To find out more visit the Mimecast website.
