T1B0 commented on code in PR #2165:
URL: https://github.com/apache/iggy/pull/2165#discussion_r2344512378


##########
examples/nodejs/src/basic-js/producer.js:
##########
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Client, Partitioning } from 'apache-iggy';
+import debug from 'debug';
+
+const log = debug('iggy:basic:producer');
+
+const STREAM_ID = 1;
+const TOPIC_ID = 1;
+const PARTITION_ID = 1;
+const BATCHES_LIMIT = 5;
+const MESSAGES_PER_BATCH = 10;
+
+function parseArgs() {
+  const args = process.argv.slice(2);
+  const connectionString = args[0] || 'iggy+tcp://iggy:[email protected]:8090';
+  
+  if (args.length > 0 && (args[0] === '-h' || args[0] === '--help')) {
+    console.log('Usage: node producer.js [connection_string]');
+    console.log('Example: node producer.js 
iggy+tcp://iggy:[email protected]:8090');
+    process.exit(0);
+  }
+  
+  return { connectionString };
+}
+
+async function initSystem(client) {
+  try {
+    log('Creating stream with ID %d...', STREAM_ID);
+    await client.stream.create({ streamId: STREAM_ID, name: 'sample-stream' });
+    log('Stream was created successfully.');
+  } catch (error) {
+    log('Stream already exists or error creating stream: %o', error);
+  }
+
+  try {
+    log('Creating topic with ID %d in stream %d...', TOPIC_ID, STREAM_ID);
+    await client.topic.create({
+      streamId: STREAM_ID,
+      topicId: TOPIC_ID,
+      name: 'sample-topic',
+      partitionCount: 1,
+      compressionAlgorithm: 1, // None
+      replicationFactor: 1
+    });
+    log('Topic was created successfully.');
+  } catch (error) {
+    log('Topic already exists or error creating topic: %o', error);
+  }
+
+  // Wait a moment for the topic to be fully created
+  await new Promise(resolve => setTimeout(resolve, 100));

Review Comment:
   "sleep" not needed - server has already ACK operation when await 
topic.create() resolved. 



##########
examples/nodejs/src/basic-js/consumer.js:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Client, PollingStrategy, Consumer } from 'apache-iggy';
+import debug from 'debug';
+
+const log = debug('iggy:basic:consumer');
+
+const STREAM_ID = 1;
+const TOPIC_ID = 1;
+const PARTITION_ID = 1;
+const BATCHES_LIMIT = 5;
+const MESSAGES_PER_BATCH = 10;
+
+function parseArgs() {
+  const args = process.argv.slice(2);
+  const connectionString = args[0] || 'iggy+tcp://iggy:[email protected]:8090';
+  
+  if (args.length > 0 && (args[0] === '-h' || args[0] === '--help')) {
+    console.log('Usage: node consumer.js [connection_string]');
+    console.log('Example: node consumer.js 
iggy+tcp://iggy:[email protected]:8090');
+    process.exit(0);
+  }
+  
+  return { connectionString };
+}
+
+function handleMessage(message) {
+  const payload = message.payload.toString();
+  const offset = message.headers.offset;
+  const timestamp = message.headers.timestamp;
+  
+  log('Received message: %s (offset: %d, timestamp: %d)', payload, offset, 
timestamp);
+}
+
+async function consumeMessages(client) {
+  const interval = 500; // 500 milliseconds
+  log(
+    'Messages will be consumed from stream: %d, topic: %d, partition: %d with 
interval %d ms.',
+    STREAM_ID,
+    TOPIC_ID,
+    PARTITION_ID,
+    interval
+  );
+
+  let offset = 0;
+  let consumedBatches = 0;
+
+  while (consumedBatches < BATCHES_LIMIT) {
+    try {
+      log('Polling for messages...');
+      const polledMessages = await client.message.poll({
+        streamId: STREAM_ID,
+        topicId: TOPIC_ID,
+        consumer: Consumer.Single,
+        partitionId: PARTITION_ID,
+        pollingStrategy: PollingStrategy.Offset(BigInt(offset)),
+        count: MESSAGES_PER_BATCH,
+        autocommit: false
+      });
+
+      if (!polledMessages || polledMessages.messages.length === 0) {
+        log('No messages found in current poll - this is expected if the 
producer had issues sending messages');
+        consumedBatches++; // Increment even when no messages found
+        log('Completed poll attempt %d (no messages).', consumedBatches);
+        await new Promise(resolve => setTimeout(resolve, interval));
+        continue;
+      }
+
+      offset += polledMessages.messages.length;
+      
+      for (const message of polledMessages.messages) {
+        handleMessage(message);
+      }
+      
+      consumedBatches++;
+      log('Consumed %d message(s) in batch %d.', 
polledMessages.messages.length, consumedBatches);
+      
+      await new Promise(resolve => setTimeout(resolve, interval));
+    } catch (error) {
+      log('Error consuming messages: %o', error);
+      consumedBatches++;
+      await new Promise(resolve => setTimeout(resolve, interval));
+    }
+  }
+
+  log('Consumed %d batches of messages, exiting.', consumedBatches);
+}
+
+async function main() {
+  const args = parseArgs();
+  
+  log('Using connection string: %s', args.connectionString);
+  
+  // Parse connection string (simplified parsing for this example)
+  const url = new URL(args.connectionString.replace('iggy+tcp://', 'http://'));
+  const host = url.hostname;
+  const port = parseInt(url.port) || 8090;
+  const username = url.username || 'iggy';
+  const password = url.password || 'iggy';
+  
+  const client = new Client({
+    transport: 'TCP',
+    options: { port, host },
+    credentials: { username, password }
+  });
+
+  try {
+    log('Basic consumer has started, selected transport: TCP');
+    log('Connecting to Iggy server...');
+    // Client connects automatically when first command is called
+    log('Connected successfully.');
+
+    log('Logging in user...');
+    await client.session.login({ username, password });

Review Comment:
   client will handle login() on its own, you do not need to call it explicitly



##########
examples/nodejs/src/basic-js/producer.js:
##########
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Client, Partitioning } from 'apache-iggy';
+import debug from 'debug';
+
+const log = debug('iggy:basic:producer');
+
+const STREAM_ID = 1;
+const TOPIC_ID = 1;
+const PARTITION_ID = 1;
+const BATCHES_LIMIT = 5;
+const MESSAGES_PER_BATCH = 10;
+
+function parseArgs() {
+  const args = process.argv.slice(2);
+  const connectionString = args[0] || 'iggy+tcp://iggy:[email protected]:8090';
+  
+  if (args.length > 0 && (args[0] === '-h' || args[0] === '--help')) {
+    console.log('Usage: node producer.js [connection_string]');
+    console.log('Example: node producer.js 
iggy+tcp://iggy:[email protected]:8090');
+    process.exit(0);
+  }
+  
+  return { connectionString };
+}
+
+async function initSystem(client) {
+  try {
+    log('Creating stream with ID %d...', STREAM_ID);
+    await client.stream.create({ streamId: STREAM_ID, name: 'sample-stream' });
+    log('Stream was created successfully.');
+  } catch (error) {
+    log('Stream already exists or error creating stream: %o', error);
+  }
+
+  try {
+    log('Creating topic with ID %d in stream %d...', TOPIC_ID, STREAM_ID);
+    await client.topic.create({
+      streamId: STREAM_ID,
+      topicId: TOPIC_ID,
+      name: 'sample-topic',
+      partitionCount: 1,
+      compressionAlgorithm: 1, // None
+      replicationFactor: 1
+    });
+    log('Topic was created successfully.');
+  } catch (error) {
+    log('Topic already exists or error creating topic: %o', error);
+  }
+
+  // Wait a moment for the topic to be fully created
+  await new Promise(resolve => setTimeout(resolve, 100));
+}
+
+async function produceMessages(client) {
+  const interval = 500; // 500 milliseconds
+  log(
+    'Messages will be sent to stream: %d, topic: %d, partition: %d with 
interval %d ms.',
+    STREAM_ID,
+    TOPIC_ID,
+    PARTITION_ID,
+    interval
+  );
+
+  let currentId = 0;
+  let sentBatches = 0;
+
+  while (sentBatches < BATCHES_LIMIT) {
+    const messages = [];
+    const sentMessages = [];
+    
+    for (let i = 0; i < MESSAGES_PER_BATCH; i++) {
+      currentId++;
+      const payload = `message-${currentId}`;
+      messages.push({
+        payload: Buffer.from(payload, 'utf8')
+      });
+      sentMessages.push(payload);
+    }
+
+    try {
+      await client.message.send({
+        streamId: STREAM_ID,
+        topicId: TOPIC_ID,
+        messages,
+        partition: Partitioning.PartitionId(PARTITION_ID)
+      });
+      
+      sentBatches++;
+      log('Sent messages: %o', sentMessages);
+    } catch (error) {
+      log('Error sending messages: %o', error);
+      log('This might be due to server version compatibility. The stream and 
topic creation worked successfully.');
+      log('Please check the Iggy server version and ensure it supports the 
SendMessages command.');
+      // Don't throw error, just log and continue to show that other parts work
+      sentBatches++;
+      log('Simulated sending messages: %o', sentMessages);
+    }
+
+    // Wait for the interval
+    await new Promise(resolve => setTimeout(resolve, interval));
+  }
+
+  log('Sent %d batches of messages, exiting.', sentBatches);
+}
+
+async function main() {
+  const args = parseArgs();
+  
+  log('Using connection string: %s', args.connectionString);
+  
+  // Parse connection string (simplified parsing for this example)
+  const url = new URL(args.connectionString.replace('iggy+tcp://', 'http://'));
+  const host = url.hostname;
+  const port = parseInt(url.port) || 8090;
+  const username = url.username || 'iggy';
+  const password = url.password || 'iggy';
+  
+  const client = new Client({
+    transport: 'TCP',
+    options: { port, host },
+    credentials: { username, password }
+  });
+
+  try {
+    log('Basic producer has started, selected transport: TCP');
+    log('Connecting to Iggy server...');
+    // Client connects automatically when first command is called
+    log('Connected successfully.');
+
+    log('Logging in user...');
+    await client.session.login({ username, password });

Review Comment:
   login is not needed



##########
examples/nodejs/src/basic-js/producer.js:
##########
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Client, Partitioning } from 'apache-iggy';
+import debug from 'debug';
+
+const log = debug('iggy:basic:producer');
+
+const STREAM_ID = 1;
+const TOPIC_ID = 1;
+const PARTITION_ID = 1;
+const BATCHES_LIMIT = 5;
+const MESSAGES_PER_BATCH = 10;
+
+function parseArgs() {
+  const args = process.argv.slice(2);
+  const connectionString = args[0] || 'iggy+tcp://iggy:[email protected]:8090';
+  
+  if (args.length > 0 && (args[0] === '-h' || args[0] === '--help')) {
+    console.log('Usage: node producer.js [connection_string]');
+    console.log('Example: node producer.js 
iggy+tcp://iggy:[email protected]:8090');
+    process.exit(0);
+  }
+  
+  return { connectionString };
+}
+
+async function initSystem(client) {
+  try {
+    log('Creating stream with ID %d...', STREAM_ID);
+    await client.stream.create({ streamId: STREAM_ID, name: 'sample-stream' });
+    log('Stream was created successfully.');
+  } catch (error) {
+    log('Stream already exists or error creating stream: %o', error);
+  }
+
+  try {
+    log('Creating topic with ID %d in stream %d...', TOPIC_ID, STREAM_ID);
+    await client.topic.create({
+      streamId: STREAM_ID,
+      topicId: TOPIC_ID,
+      name: 'sample-topic',
+      partitionCount: 1,
+      compressionAlgorithm: 1, // None
+      replicationFactor: 1
+    });
+    log('Topic was created successfully.');
+  } catch (error) {
+    log('Topic already exists or error creating topic: %o', error);
+  }
+
+  // Wait a moment for the topic to be fully created
+  await new Promise(resolve => setTimeout(resolve, 100));
+}
+
+async function produceMessages(client) {
+  const interval = 500; // 500 milliseconds
+  log(
+    'Messages will be sent to stream: %d, topic: %d, partition: %d with 
interval %d ms.',
+    STREAM_ID,
+    TOPIC_ID,
+    PARTITION_ID,
+    interval
+  );
+
+  let currentId = 0;
+  let sentBatches = 0;
+
+  while (sentBatches < BATCHES_LIMIT) {
+    const messages = [];
+    const sentMessages = [];
+    
+    for (let i = 0; i < MESSAGES_PER_BATCH; i++) {
+      currentId++;
+      const payload = `message-${currentId}`;
+      messages.push({
+        payload: Buffer.from(payload, 'utf8')
+      });
+      sentMessages.push(payload);
+    }
+
+    try {
+      await client.message.send({
+        streamId: STREAM_ID,
+        topicId: TOPIC_ID,
+        messages,
+        partition: Partitioning.PartitionId(PARTITION_ID)
+      });
+      
+      sentBatches++;
+      log('Sent messages: %o', sentMessages);
+    } catch (error) {
+      log('Error sending messages: %o', error);
+      log('This might be due to server version compatibility. The stream and 
topic creation worked successfully.');
+      log('Please check the Iggy server version and ensure it supports the 
SendMessages command.');
+      // Don't throw error, just log and continue to show that other parts work
+      sentBatches++;
+      log('Simulated sending messages: %o', sentMessages);
+    }
+
+    // Wait for the interval
+    await new Promise(resolve => setTimeout(resolve, interval));
+  }
+
+  log('Sent %d batches of messages, exiting.', sentBatches);
+}
+
+async function main() {
+  const args = parseArgs();
+  
+  log('Using connection string: %s', args.connectionString);
+  
+  // Parse connection string (simplified parsing for this example)
+  const url = new URL(args.connectionString.replace('iggy+tcp://', 'http://'));
+  const host = url.hostname;
+  const port = parseInt(url.port) || 8090;
+  const username = url.username || 'iggy';
+  const password = url.password || 'iggy';
+  
+  const client = new Client({
+    transport: 'TCP',
+    options: { port, host },
+    credentials: { username, password }
+  });
+
+  try {
+    log('Basic producer has started, selected transport: TCP');
+    log('Connecting to Iggy server...');
+    // Client connects automatically when first command is called
+    log('Connected successfully.');
+
+    log('Logging in user...');
+    await client.session.login({ username, password });
+    log('Logged in successfully.');
+
+    await initSystem(client);
+    await produceMessages(client);
+  } catch (error) {
+    log('Error in main: %o', error);
+    process.exit(1);

Review Comment:
   if you exit in catch block i don't think subsequent finally will be executed



##########
examples/nodejs/src/basic-js/consumer.js:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Client, PollingStrategy, Consumer } from 'apache-iggy';
+import debug from 'debug';
+
+const log = debug('iggy:basic:consumer');

Review Comment:
   debug is for debugging purpose, a simple console.log / console.error would 
have been better for those examples 



##########
examples/nodejs/src/basic-js/consumer.js:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Client, PollingStrategy, Consumer } from 'apache-iggy';
+import debug from 'debug';
+
+const log = debug('iggy:basic:consumer');
+
+const STREAM_ID = 1;
+const TOPIC_ID = 1;
+const PARTITION_ID = 1;
+const BATCHES_LIMIT = 5;
+const MESSAGES_PER_BATCH = 10;
+
+function parseArgs() {
+  const args = process.argv.slice(2);
+  const connectionString = args[0] || 'iggy+tcp://iggy:[email protected]:8090';
+  
+  if (args.length > 0 && (args[0] === '-h' || args[0] === '--help')) {
+    console.log('Usage: node consumer.js [connection_string]');
+    console.log('Example: node consumer.js 
iggy+tcp://iggy:[email protected]:8090');
+    process.exit(0);
+  }
+  
+  return { connectionString };
+}
+
+function handleMessage(message) {
+  const payload = message.payload.toString();
+  const offset = message.headers.offset;
+  const timestamp = message.headers.timestamp;
+  
+  log('Received message: %s (offset: %d, timestamp: %d)', payload, offset, 
timestamp);
+}
+
+async function consumeMessages(client) {
+  const interval = 500; // 500 milliseconds
+  log(
+    'Messages will be consumed from stream: %d, topic: %d, partition: %d with 
interval %d ms.',
+    STREAM_ID,
+    TOPIC_ID,
+    PARTITION_ID,
+    interval
+  );
+
+  let offset = 0;
+  let consumedBatches = 0;
+
+  while (consumedBatches < BATCHES_LIMIT) {
+    try {
+      log('Polling for messages...');
+      const polledMessages = await client.message.poll({
+        streamId: STREAM_ID,
+        topicId: TOPIC_ID,
+        consumer: Consumer.Single,
+        partitionId: PARTITION_ID,
+        pollingStrategy: PollingStrategy.Offset(BigInt(offset)),
+        count: MESSAGES_PER_BATCH,
+        autocommit: false
+      });
+
+      if (!polledMessages || polledMessages.messages.length === 0) {
+        log('No messages found in current poll - this is expected if the 
producer had issues sending messages');
+        consumedBatches++; // Increment even when no messages found
+        log('Completed poll attempt %d (no messages).', consumedBatches);
+        await new Promise(resolve => setTimeout(resolve, interval));
+        continue;
+      }
+
+      offset += polledMessages.messages.length;
+      
+      for (const message of polledMessages.messages) {
+        handleMessage(message);
+      }
+      
+      consumedBatches++;
+      log('Consumed %d message(s) in batch %d.', 
polledMessages.messages.length, consumedBatches);
+      
+      await new Promise(resolve => setTimeout(resolve, interval));
+    } catch (error) {
+      log('Error consuming messages: %o', error);
+      consumedBatches++;
+      await new Promise(resolve => setTimeout(resolve, interval));
+    }
+  }
+
+  log('Consumed %d batches of messages, exiting.', consumedBatches);
+}
+
+async function main() {
+  const args = parseArgs();
+  
+  log('Using connection string: %s', args.connectionString);
+  
+  // Parse connection string (simplified parsing for this example)
+  const url = new URL(args.connectionString.replace('iggy+tcp://', 'http://'));
+  const host = url.hostname;
+  const port = parseInt(url.port) || 8090;
+  const username = url.username || 'iggy';
+  const password = url.password || 'iggy';
+  
+  const client = new Client({
+    transport: 'TCP',
+    options: { port, host },
+    credentials: { username, password }
+  });
+
+  try {
+    log('Basic consumer has started, selected transport: TCP');
+    log('Connecting to Iggy server...');
+    // Client connects automatically when first command is called
+    log('Connected successfully.');
+
+    log('Logging in user...');
+    await client.session.login({ username, password });
+    log('Logged in successfully.');
+
+    await consumeMessages(client);
+  } catch (error) {
+    log('Error in main: %o', error);
+    process.exit(1);
+  } finally {
+    await client.destroy();
+    log('Disconnected from server.');
+  }
+}
+
+// Handle uncaught exceptions
+process.on('uncaughtException', (error) => {

Review Comment:
   this should not be a concern - especially for an example script - see 
https://nodejs.org/api/process.html#process_warning_using_uncaughtexception_correctly



##########
examples/nodejs/src/basic-js/consumer.js:
##########
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Client, PollingStrategy, Consumer } from 'apache-iggy';
+import debug from 'debug';
+
+const log = debug('iggy:basic:consumer');
+
+const STREAM_ID = 1;
+const TOPIC_ID = 1;
+const PARTITION_ID = 1;
+const BATCHES_LIMIT = 5;
+const MESSAGES_PER_BATCH = 10;
+
+function parseArgs() {
+  const args = process.argv.slice(2);
+  const connectionString = args[0] || 'iggy+tcp://iggy:[email protected]:8090';
+  
+  if (args.length > 0 && (args[0] === '-h' || args[0] === '--help')) {
+    console.log('Usage: node consumer.js [connection_string]');
+    console.log('Example: node consumer.js 
iggy+tcp://iggy:[email protected]:8090');
+    process.exit(0);
+  }
+  
+  return { connectionString };
+}
+
+function handleMessage(message) {
+  const payload = message.payload.toString();
+  const offset = message.headers.offset;
+  const timestamp = message.headers.timestamp;
+  
+  log('Received message: %s (offset: %d, timestamp: %d)', payload, offset, 
timestamp);
+}
+
+async function consumeMessages(client) {
+  const interval = 500; // 500 milliseconds
+  log(
+    'Messages will be consumed from stream: %d, topic: %d, partition: %d with 
interval %d ms.',
+    STREAM_ID,
+    TOPIC_ID,
+    PARTITION_ID,
+    interval
+  );
+
+  let offset = 0;
+  let consumedBatches = 0;
+
+  while (consumedBatches < BATCHES_LIMIT) {
+    try {
+      log('Polling for messages...');
+      const polledMessages = await client.message.poll({
+        streamId: STREAM_ID,
+        topicId: TOPIC_ID,
+        consumer: Consumer.Single,
+        partitionId: PARTITION_ID,
+        pollingStrategy: PollingStrategy.Offset(BigInt(offset)),
+        count: MESSAGES_PER_BATCH,
+        autocommit: false
+      });
+
+      if (!polledMessages || polledMessages.messages.length === 0) {
+        log('No messages found in current poll - this is expected if the 
producer had issues sending messages');
+        consumedBatches++; // Increment even when no messages found
+        log('Completed poll attempt %d (no messages).', consumedBatches);
+        await new Promise(resolve => setTimeout(resolve, interval));
+        continue;
+      }
+
+      offset += polledMessages.messages.length;
+      
+      for (const message of polledMessages.messages) {
+        handleMessage(message);
+      }
+      
+      consumedBatches++;
+      log('Consumed %d message(s) in batch %d.', 
polledMessages.messages.length, consumedBatches);
+      
+      await new Promise(resolve => setTimeout(resolve, interval));
+    } catch (error) {
+      log('Error consuming messages: %o', error);
+      consumedBatches++;
+      await new Promise(resolve => setTimeout(resolve, interval));
+    }
+  }
+
+  log('Consumed %d batches of messages, exiting.', consumedBatches);
+}
+
+async function main() {
+  const args = parseArgs();
+  
+  log('Using connection string: %s', args.connectionString);
+  
+  // Parse connection string (simplified parsing for this example)
+  const url = new URL(args.connectionString.replace('iggy+tcp://', 'http://'));
+  const host = url.hostname;
+  const port = parseInt(url.port) || 8090;
+  const username = url.username || 'iggy';
+  const password = url.password || 'iggy';
+  
+  const client = new Client({
+    transport: 'TCP',
+    options: { port, host },
+    credentials: { username, password }
+  });
+
+  try {
+    log('Basic consumer has started, selected transport: TCP');
+    log('Connecting to Iggy server...');
+    // Client connects automatically when first command is called
+    log('Connected successfully.');
+
+    log('Logging in user...');
+    await client.session.login({ username, password });
+    log('Logged in successfully.');
+
+    await consumeMessages(client);
+  } catch (error) {
+    log('Error in main: %o', error);
+    process.exit(1);
+  } finally {
+    await client.destroy();
+    log('Disconnected from server.');
+  }
+}
+
+// Handle uncaught exceptions
+process.on('uncaughtException', (error) => {
+  log('Uncaught Exception: %o', error);
+  process.exit(1);
+});
+
+process.on('unhandledRejection', (reason, promise) => {
+  log('Unhandled Rejection at: %o, reason: %o', promise, reason);
+  process.exit(1);
+});
+
+main().catch((error) => {

Review Comment:
   try { await main() } catch (err) {...} 
   would be more coherent with the rest of the code (es5 promise style vs es6 
async/await)



##########
examples/nodejs/src/basic-js/producer.js:
##########
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Client, Partitioning } from 'apache-iggy';
+import debug from 'debug';
+
+const log = debug('iggy:basic:producer');
+
+const STREAM_ID = 1;
+const TOPIC_ID = 1;
+const PARTITION_ID = 1;
+const BATCHES_LIMIT = 5;
+const MESSAGES_PER_BATCH = 10;
+
+function parseArgs() {
+  const args = process.argv.slice(2);
+  const connectionString = args[0] || 'iggy+tcp://iggy:[email protected]:8090';
+  
+  if (args.length > 0 && (args[0] === '-h' || args[0] === '--help')) {
+    console.log('Usage: node producer.js [connection_string]');
+    console.log('Example: node producer.js 
iggy+tcp://iggy:[email protected]:8090');
+    process.exit(0);
+  }
+  
+  return { connectionString };
+}
+
+async function initSystem(client) {
+  try {
+    log('Creating stream with ID %d...', STREAM_ID);
+    await client.stream.create({ streamId: STREAM_ID, name: 'sample-stream' });
+    log('Stream was created successfully.');
+  } catch (error) {
+    log('Stream already exists or error creating stream: %o', error);
+  }
+
+  try {
+    log('Creating topic with ID %d in stream %d...', TOPIC_ID, STREAM_ID);
+    await client.topic.create({
+      streamId: STREAM_ID,
+      topicId: TOPIC_ID,
+      name: 'sample-topic',
+      partitionCount: 1,
+      compressionAlgorithm: 1, // None
+      replicationFactor: 1
+    });
+    log('Topic was created successfully.');
+  } catch (error) {
+    log('Topic already exists or error creating topic: %o', error);
+  }
+
+  // Wait a moment for the topic to be fully created
+  await new Promise(resolve => setTimeout(resolve, 100));
+}
+
+async function produceMessages(client) {
+  const interval = 500; // 500 milliseconds
+  log(
+    'Messages will be sent to stream: %d, topic: %d, partition: %d with 
interval %d ms.',
+    STREAM_ID,
+    TOPIC_ID,
+    PARTITION_ID,
+    interval
+  );
+
+  let currentId = 0;
+  let sentBatches = 0;
+
+  while (sentBatches < BATCHES_LIMIT) {
+    const messages = [];
+    const sentMessages = [];
+    
+    for (let i = 0; i < MESSAGES_PER_BATCH; i++) {
+      currentId++;
+      const payload = `message-${currentId}`;
+      messages.push({
+        payload: Buffer.from(payload, 'utf8')

Review Comment:
   payload can also be a string



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to