The last thing I can add to clarify is, the 3 node cluster is a centralized
cluster and the CSV loader is a thick client running on its own machine.

On Tue, Feb 28, 2023 at 2:52 PM John Smith <java.dev....@gmail.com> wrote:

> Btw when I run a query like SELECT COLUMN_2, COUNT(COLUMN_1) FROM MY_TABLE
> GROUP BY COLUMN_2; The query runs full tilt 100% on all 3 nodes and returns
> in a respectable manager.
>
> So not sure whats going on but with the data streamer I guess most of the
> writes are pushed to THE ONE node mostly and the others are busy making the
> backups or the network to push/back up can't keep up?
> The same behaviour happens with replicated table when using the data, one
> node seems to be running almost 100% while the others hover at 40-50%
> The fastest I could get the streamer to work is to turn off backups, but
> same thing, one node runs full tilt while the others are "slowish"
>
> Queries are ok, all nodes are fully utilized.
>
> On Tue, Feb 28, 2023 at 12:54 PM John Smith <java.dev....@gmail.com>
> wrote:
>
>> Hi so I'm using it in a pretty straight forward kind of way at least I
>> think...
>>
>> I'm loading 35 million lines from CSV to an SQL table. Decided to use
>> streamer as I figured it would still be allot faster than batching SQL
>> INSERTS.
>> I tried with backup=0 and backup=1 (Prefer to have backup on)
>> 1- With 0 backups: 6 minutes to load
>> 2- With 1 backups: 15 minutes to load.
>>
>> In both cases I still see the same behaviour, the 1 machine seems to be
>> taking the brunt of the work...
>>
>> I'm reading a CSV file line by line and doing streamer.add()
>>
>> The table definition is as follows...
>> CREATE TABLE PUBLIC.MY_TABLE (
>>     COLUMN_1 VARCHAR(32) NOT NULL,
>>     COLUMN_2 VARCHAR(64) NOT NULL,
>>     CONSTRAINT PHONE_CARRIER_IDS_PK PRIMARY KEY (COLUMN_1)
>> ) with "template=parallelTpl, backups=0, key_type=String,
>> value_type=MyObject";
>> CREATE INDEX MY_TABLE_COLUMN_2_IDX ON PUBLIC.MY_TABLE (COLUMN_2);
>>
>>         String fileName = "my_file";
>>
>>         final String cacheNameDest = "MY_TABLE";
>>
>>         try(
>>                 Ignite igniteDest =
>> configIgnite(Arrays.asList("...:47500..47509", "...:47500..47509",
>> "...:47500..47509"), "ignite-dest");
>>                 IgniteCache<BinaryObject, BinaryObject> cacheDest =
>> igniteDest.getOrCreateCache(cacheNameDest).withKeepBinary();
>>                 IgniteDataStreamer<BinaryObject, BinaryObject> streamer =
>> igniteDest.dataStreamer(cacheNameDest);
>>         ) {
>>             System.out.println("Ignite started.");
>>             long start = System.currentTimeMillis();
>>
>>             System.out.println("Cache size: " +
>> cacheDest.size(CachePeekMode.PRIMARY));
>>             System.out.println("Default");
>>             System.out.println("1d");
>>
>>             IgniteBinary binaryDest = igniteDest.binary();
>>
>>             try (BufferedReader br = new BufferedReader(new
>> FileReader(fileName))) {
>>                 int count = 0;
>>
>>                 String line;
>>                 while ((line = br.readLine()) != null) {
>>
>>                     String[] parts = line.split("\\|");
>>
>>                     BinaryObjectBuilder keyBuilder =
>> binaryDest.builder("String");
>>                     keyBuilder.setField("COLUMN_1", parts[1],
>> String.class);
>>                     BinaryObjectBuilder valueBuilder =
>> binaryDest.builder("PhoneCarrier");
>>                     valueBuilder.setField("COLUMN_2", parts[3],
>> String.class);
>>
>>                     streamer.addData(keyBuilder.build(),
>> valueBuilder.build());
>>
>>                     count++;
>>
>>                     if ((count % 10000) == 0) {
>>                         System.out.println(count);
>>                     }
>>                 }
>>                 streamer.flush();
>>                 long end = System.currentTimeMillis();
>>                 System.out.println("Ms: " + (end - start));
>>             } catch (IOException e) {
>>                 e.printStackTrace();
>>             }
>>         }
>>
>> On Tue, Feb 28, 2023 at 11:00 AM Jeremy McMillan <
>> jeremy.mcmil...@gridgain.com> wrote:
>>
>>> Have you tried tracing the workload on the 100% and 40% nodes for
>>> comparison? There just isn't enough detail in your question to help predict
>>> what should be happening with the cluster workload. For a starting point,
>>> please identify your design goals. It's easy to get confused by advice that
>>> seeks to help you do something you don't want to do.
>>>
>>> Some things to think about include how the stream workload is composed.
>>> How should/would this work if there were only one node? How should behavior
>>> change as nodes are added to the topology and the test is repeated?
>>>
>>> Gedanken: what if the data streamer is doing some really expensive
>>> operations as it feeds the data into the stream, but the nodes can very
>>> cheaply put the processed data into their cache partitions? In this case,
>>> for example, the expensive operations should be refactored into a stream
>>> transformer that will move the workload from the stream sender to the
>>> stream receivers.
>>> https://ignite.apache.org/docs/latest/data-streaming#stream-transformer
>>>
>>> Also gedanken: what if the data distribution is skewed such that one
>>> node gets more data than 2x the data sent to other partitions because of
>>> affinity? In this case, for example, changes to affinity/colocation design
>>> or changes to cluster topology (more nodes with greater CPU to RAM ratio?)
>>> can help distribute the load so that no single node becomes a bottleneck.
>>>
>>> On Tue, Feb 28, 2023 at 9:27 AM John Smith <java.dev....@gmail.com>
>>> wrote:
>>>
>>>> Hi I'm using the data streamer to insert into a 3 cluster node. I have
>>>> noticed that 1 node is pegging at 100% cpu while the others are at 40ish %.
>>>>
>>>> Is that normal?
>>>>
>>>>
>>>>

Reply via email to