jnturton commented on code in PR #2722:
URL: https://github.com/apache/drill/pull/2722#discussion_r1055192926


##########
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java:
##########
@@ -98,27 +100,69 @@ public void updateSchema(VectorAccessible batch) {
   @Override
   public void startRecord() {
     logger.debug("Starting record");
-    // Ensure that the new record is empty. This is not strictly necessary, 
but it is a belt and suspenders approach.
-    splunkEvent.clear();
+    // Ensure that the new record is empty.
+    splunkEvent = new JSONObject();
   }
 
   @Override
-  public void endRecord() throws IOException {
+  public void endRecord() {
     logger.debug("Ending record");
+    recordCount++;
+
+    // Put event in buffer
+    eventBuffer.add(splunkEvent);
+
     // Write the event to the Splunk index
-    destinationIndex.submit(eventArgs, splunkEvent.toJSONString());
-    // Clear out the splunk event.
-    splunkEvent.clear();
+    if (recordCount >= config.getPluginConfig().getWriterBatchSize()) {
+      try {
+        writeEvents();
+      } catch (IOException e) {
+        throw  UserException.dataWriteError(e)
+            .message("Error writing data to Splunk: " + e.getMessage())
+            .build(logger);
+      }
+
+      // Reset record count
+      recordCount = 0;
+    }
   }
 
+
+  /*
+  args – Optional arguments for this stream. Valid parameters are: "host", 
"host_regex", "source", and "sourcetype".
+   */
   @Override
   public void abort() {
+    logger.debug("Aborting writing records to Splunk.");
     // No op
   }
 
   @Override
   public void cleanup() {
-    // No op
+    try {
+      writeEvents();
+    } catch (IOException e) {
+      throw  UserException.dataWriteError(e)
+          .message("Error writing data to Splunk: " + e.getMessage())
+          .build(logger);
+    }
+  }
+
+  private void writeEvents() throws IOException {
+    // Open the socket and stream, set up a timestamp
+    destinationIndex.attachWith(new ReceiverBehavior() {

Review Comment:
   This results in a dedicated TCP socket being opened and closed for every 
writer batch.



##########
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java:
##########
@@ -98,27 +100,69 @@ public void updateSchema(VectorAccessible batch) {
   @Override
   public void startRecord() {
     logger.debug("Starting record");
-    // Ensure that the new record is empty. This is not strictly necessary, 
but it is a belt and suspenders approach.
-    splunkEvent.clear();
+    // Ensure that the new record is empty.
+    splunkEvent = new JSONObject();
   }
 
   @Override
-  public void endRecord() throws IOException {
+  public void endRecord() {
     logger.debug("Ending record");
+    recordCount++;
+
+    // Put event in buffer
+    eventBuffer.add(splunkEvent);
+
     // Write the event to the Splunk index
-    destinationIndex.submit(eventArgs, splunkEvent.toJSONString());
-    // Clear out the splunk event.
-    splunkEvent.clear();
+    if (recordCount >= config.getPluginConfig().getWriterBatchSize()) {
+      try {
+        writeEvents();
+      } catch (IOException e) {
+        throw  UserException.dataWriteError(e)
+            .message("Error writing data to Splunk: " + e.getMessage())
+            .build(logger);
+      }
+
+      // Reset record count
+      recordCount = 0;
+    }
   }
 
+
+  /*
+  args – Optional arguments for this stream. Valid parameters are: "host", 
"host_regex", "source", and "sourcetype".
+   */
   @Override
   public void abort() {
+    logger.debug("Aborting writing records to Splunk.");

Review Comment:
   Would there be any use in clearing eventBuffer here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@drill.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to