The last thing I can add to clarify is, the 3 node cluster is a centralized cluster and the CSV loader is a thick client running on its own machine.
On Tue, Feb 28, 2023 at 2:52 PM John Smith <java.dev....@gmail.com> wrote: > Btw when I run a query like SELECT COLUMN_2, COUNT(COLUMN_1) FROM MY_TABLE > GROUP BY COLUMN_2; The query runs full tilt 100% on all 3 nodes and returns > in a respectable manager. > > So not sure whats going on but with the data streamer I guess most of the > writes are pushed to THE ONE node mostly and the others are busy making the > backups or the network to push/back up can't keep up? > The same behaviour happens with replicated table when using the data, one > node seems to be running almost 100% while the others hover at 40-50% > The fastest I could get the streamer to work is to turn off backups, but > same thing, one node runs full tilt while the others are "slowish" > > Queries are ok, all nodes are fully utilized. > > On Tue, Feb 28, 2023 at 12:54 PM John Smith <java.dev....@gmail.com> > wrote: > >> Hi so I'm using it in a pretty straight forward kind of way at least I >> think... >> >> I'm loading 35 million lines from CSV to an SQL table. Decided to use >> streamer as I figured it would still be allot faster than batching SQL >> INSERTS. >> I tried with backup=0 and backup=1 (Prefer to have backup on) >> 1- With 0 backups: 6 minutes to load >> 2- With 1 backups: 15 minutes to load. >> >> In both cases I still see the same behaviour, the 1 machine seems to be >> taking the brunt of the work... >> >> I'm reading a CSV file line by line and doing streamer.add() >> >> The table definition is as follows... >> CREATE TABLE PUBLIC.MY_TABLE ( >> COLUMN_1 VARCHAR(32) NOT NULL, >> COLUMN_2 VARCHAR(64) NOT NULL, >> CONSTRAINT PHONE_CARRIER_IDS_PK PRIMARY KEY (COLUMN_1) >> ) with "template=parallelTpl, backups=0, key_type=String, >> value_type=MyObject"; >> CREATE INDEX MY_TABLE_COLUMN_2_IDX ON PUBLIC.MY_TABLE (COLUMN_2); >> >> String fileName = "my_file"; >> >> final String cacheNameDest = "MY_TABLE"; >> >> try( >> Ignite igniteDest = >> configIgnite(Arrays.asList("...:47500..47509", "...:47500..47509", >> "...:47500..47509"), "ignite-dest"); >> IgniteCache<BinaryObject, BinaryObject> cacheDest = >> igniteDest.getOrCreateCache(cacheNameDest).withKeepBinary(); >> IgniteDataStreamer<BinaryObject, BinaryObject> streamer = >> igniteDest.dataStreamer(cacheNameDest); >> ) { >> System.out.println("Ignite started."); >> long start = System.currentTimeMillis(); >> >> System.out.println("Cache size: " + >> cacheDest.size(CachePeekMode.PRIMARY)); >> System.out.println("Default"); >> System.out.println("1d"); >> >> IgniteBinary binaryDest = igniteDest.binary(); >> >> try (BufferedReader br = new BufferedReader(new >> FileReader(fileName))) { >> int count = 0; >> >> String line; >> while ((line = br.readLine()) != null) { >> >> String[] parts = line.split("\\|"); >> >> BinaryObjectBuilder keyBuilder = >> binaryDest.builder("String"); >> keyBuilder.setField("COLUMN_1", parts[1], >> String.class); >> BinaryObjectBuilder valueBuilder = >> binaryDest.builder("PhoneCarrier"); >> valueBuilder.setField("COLUMN_2", parts[3], >> String.class); >> >> streamer.addData(keyBuilder.build(), >> valueBuilder.build()); >> >> count++; >> >> if ((count % 10000) == 0) { >> System.out.println(count); >> } >> } >> streamer.flush(); >> long end = System.currentTimeMillis(); >> System.out.println("Ms: " + (end - start)); >> } catch (IOException e) { >> e.printStackTrace(); >> } >> } >> >> On Tue, Feb 28, 2023 at 11:00 AM Jeremy McMillan < >> jeremy.mcmil...@gridgain.com> wrote: >> >>> Have you tried tracing the workload on the 100% and 40% nodes for >>> comparison? There just isn't enough detail in your question to help predict >>> what should be happening with the cluster workload. For a starting point, >>> please identify your design goals. It's easy to get confused by advice that >>> seeks to help you do something you don't want to do. >>> >>> Some things to think about include how the stream workload is composed. >>> How should/would this work if there were only one node? How should behavior >>> change as nodes are added to the topology and the test is repeated? >>> >>> Gedanken: what if the data streamer is doing some really expensive >>> operations as it feeds the data into the stream, but the nodes can very >>> cheaply put the processed data into their cache partitions? In this case, >>> for example, the expensive operations should be refactored into a stream >>> transformer that will move the workload from the stream sender to the >>> stream receivers. >>> https://ignite.apache.org/docs/latest/data-streaming#stream-transformer >>> >>> Also gedanken: what if the data distribution is skewed such that one >>> node gets more data than 2x the data sent to other partitions because of >>> affinity? In this case, for example, changes to affinity/colocation design >>> or changes to cluster topology (more nodes with greater CPU to RAM ratio?) >>> can help distribute the load so that no single node becomes a bottleneck. >>> >>> On Tue, Feb 28, 2023 at 9:27 AM John Smith <java.dev....@gmail.com> >>> wrote: >>> >>>> Hi I'm using the data streamer to insert into a 3 cluster node. I have >>>> noticed that 1 node is pegging at 100% cpu while the others are at 40ish %. >>>> >>>> Is that normal? >>>> >>>> >>>>