Hi so I'm using it in a pretty straight forward kind of way at least I think...
I'm loading 35 million lines from CSV to an SQL table. Decided to use streamer as I figured it would still be allot faster than batching SQL INSERTS. I tried with backup=0 and backup=1 (Prefer to have backup on) 1- With 0 backups: 6 minutes to load 2- With 1 backups: 15 minutes to load. In both cases I still see the same behaviour, the 1 machine seems to be taking the brunt of the work... I'm reading a CSV file line by line and doing streamer.add() The table definition is as follows... CREATE TABLE PUBLIC.MY_TABLE ( COLUMN_1 VARCHAR(32) NOT NULL, COLUMN_2 VARCHAR(64) NOT NULL, CONSTRAINT PHONE_CARRIER_IDS_PK PRIMARY KEY (COLUMN_1) ) with "template=parallelTpl, backups=0, key_type=String, value_type=MyObject"; CREATE INDEX MY_TABLE_COLUMN_2_IDX ON PUBLIC.MY_TABLE (COLUMN_2); String fileName = "my_file"; final String cacheNameDest = "MY_TABLE"; try( Ignite igniteDest = configIgnite(Arrays.asList("...:47500..47509", "...:47500..47509", "...:47500..47509"), "ignite-dest"); IgniteCache<BinaryObject, BinaryObject> cacheDest = igniteDest.getOrCreateCache(cacheNameDest).withKeepBinary(); IgniteDataStreamer<BinaryObject, BinaryObject> streamer = igniteDest.dataStreamer(cacheNameDest); ) { System.out.println("Ignite started."); long start = System.currentTimeMillis(); System.out.println("Cache size: " + cacheDest.size(CachePeekMode.PRIMARY)); System.out.println("Default"); System.out.println("1d"); IgniteBinary binaryDest = igniteDest.binary(); try (BufferedReader br = new BufferedReader(new FileReader(fileName))) { int count = 0; String line; while ((line = br.readLine()) != null) { String[] parts = line.split("\\|"); BinaryObjectBuilder keyBuilder = binaryDest.builder("String"); keyBuilder.setField("COLUMN_1", parts[1], String.class); BinaryObjectBuilder valueBuilder = binaryDest.builder("PhoneCarrier"); valueBuilder.setField("COLUMN_2", parts[3], String.class); streamer.addData(keyBuilder.build(), valueBuilder.build()); count++; if ((count % 10000) == 0) { System.out.println(count); } } streamer.flush(); long end = System.currentTimeMillis(); System.out.println("Ms: " + (end - start)); } catch (IOException e) { e.printStackTrace(); } } On Tue, Feb 28, 2023 at 11:00 AM Jeremy McMillan < jeremy.mcmil...@gridgain.com> wrote: > Have you tried tracing the workload on the 100% and 40% nodes for > comparison? There just isn't enough detail in your question to help predict > what should be happening with the cluster workload. For a starting point, > please identify your design goals. It's easy to get confused by advice that > seeks to help you do something you don't want to do. > > Some things to think about include how the stream workload is composed. > How should/would this work if there were only one node? How should behavior > change as nodes are added to the topology and the test is repeated? > > Gedanken: what if the data streamer is doing some really expensive > operations as it feeds the data into the stream, but the nodes can very > cheaply put the processed data into their cache partitions? In this case, > for example, the expensive operations should be refactored into a stream > transformer that will move the workload from the stream sender to the > stream receivers. > https://ignite.apache.org/docs/latest/data-streaming#stream-transformer > > Also gedanken: what if the data distribution is skewed such that one node > gets more data than 2x the data sent to other partitions because of > affinity? In this case, for example, changes to affinity/colocation design > or changes to cluster topology (more nodes with greater CPU to RAM ratio?) > can help distribute the load so that no single node becomes a bottleneck. > > On Tue, Feb 28, 2023 at 9:27 AM John Smith <java.dev....@gmail.com> wrote: > >> Hi I'm using the data streamer to insert into a 3 cluster node. I have >> noticed that 1 node is pegging at 100% cpu while the others are at 40ish %. >> >> Is that normal? >> >> >>