Hi so I'm using it in a pretty straight forward kind of way at least I
think...
I'm loading 35 million lines from CSV to an SQL table. Decided to use
streamer as I figured it would still be allot faster than batching SQL
INSERTS.
I tried with backup=0 and backup=1 (Prefer to have backup on)
1- With 0 backups: 6 minutes to load
2- With 1 backups: 15 minutes to load.
In both cases I still see the same behaviour, the 1 machine seems to be
taking the brunt of the work...
I'm reading a CSV file line by line and doing streamer.add()
The table definition is as follows...
CREATE TABLE PUBLIC.MY_TABLE (
COLUMN_1 VARCHAR(32) NOT NULL,
COLUMN_2 VARCHAR(64) NOT NULL,
CONSTRAINT PHONE_CARRIER_IDS_PK PRIMARY KEY (COLUMN_1)
) with "template=parallelTpl, backups=0, key_type=String,
value_type=MyObject";
CREATE INDEX MY_TABLE_COLUMN_2_IDX ON PUBLIC.MY_TABLE (COLUMN_2);
String fileName = "my_file";
final String cacheNameDest = "MY_TABLE";
try(
Ignite igniteDest =
configIgnite(Arrays.asList("...:47500..47509", "...:47500..47509",
"...:47500..47509"), "ignite-dest");
IgniteCache<BinaryObject, BinaryObject> cacheDest =
igniteDest.getOrCreateCache(cacheNameDest).withKeepBinary();
IgniteDataStreamer<BinaryObject, BinaryObject> streamer =
igniteDest.dataStreamer(cacheNameDest);
) {
System.out.println("Ignite started.");
long start = System.currentTimeMillis();
System.out.println("Cache size: " +
cacheDest.size(CachePeekMode.PRIMARY));
System.out.println("Default");
System.out.println("1d");
IgniteBinary binaryDest = igniteDest.binary();
try (BufferedReader br = new BufferedReader(new
FileReader(fileName))) {
int count = 0;
String line;
while ((line = br.readLine()) != null) {
String[] parts = line.split("\\|");
BinaryObjectBuilder keyBuilder =
binaryDest.builder("String");
keyBuilder.setField("COLUMN_1", parts[1], String.class);
BinaryObjectBuilder valueBuilder =
binaryDest.builder("PhoneCarrier");
valueBuilder.setField("COLUMN_2", parts[3],
String.class);
streamer.addData(keyBuilder.build(),
valueBuilder.build());
count++;
if ((count % 10000) == 0) {
System.out.println(count);
}
}
streamer.flush();
long end = System.currentTimeMillis();
System.out.println("Ms: " + (end - start));
} catch (IOException e) {
e.printStackTrace();
}
}
On Tue, Feb 28, 2023 at 11:00 AM Jeremy McMillan <
[email protected]> wrote:
> Have you tried tracing the workload on the 100% and 40% nodes for
> comparison? There just isn't enough detail in your question to help predict
> what should be happening with the cluster workload. For a starting point,
> please identify your design goals. It's easy to get confused by advice that
> seeks to help you do something you don't want to do.
>
> Some things to think about include how the stream workload is composed.
> How should/would this work if there were only one node? How should behavior
> change as nodes are added to the topology and the test is repeated?
>
> Gedanken: what if the data streamer is doing some really expensive
> operations as it feeds the data into the stream, but the nodes can very
> cheaply put the processed data into their cache partitions? In this case,
> for example, the expensive operations should be refactored into a stream
> transformer that will move the workload from the stream sender to the
> stream receivers.
> https://ignite.apache.org/docs/latest/data-streaming#stream-transformer
>
> Also gedanken: what if the data distribution is skewed such that one node
> gets more data than 2x the data sent to other partitions because of
> affinity? In this case, for example, changes to affinity/colocation design
> or changes to cluster topology (more nodes with greater CPU to RAM ratio?)
> can help distribute the load so that no single node becomes a bottleneck.
>
> On Tue, Feb 28, 2023 at 9:27 AM John Smith <[email protected]> wrote:
>
>> Hi I'm using the data streamer to insert into a 3 cluster node. I have
>> noticed that 1 node is pegging at 100% cpu while the others are at 40ish %.
>>
>> Is that normal?
>>
>>
>>