jnturton commented on code in PR #2722: URL: https://github.com/apache/drill/pull/2722#discussion_r1055192926
########## contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java: ########## @@ -98,27 +100,69 @@ public void updateSchema(VectorAccessible batch) { @Override public void startRecord() { logger.debug("Starting record"); - // Ensure that the new record is empty. This is not strictly necessary, but it is a belt and suspenders approach. - splunkEvent.clear(); + // Ensure that the new record is empty. + splunkEvent = new JSONObject(); } @Override - public void endRecord() throws IOException { + public void endRecord() { logger.debug("Ending record"); + recordCount++; + + // Put event in buffer + eventBuffer.add(splunkEvent); + // Write the event to the Splunk index - destinationIndex.submit(eventArgs, splunkEvent.toJSONString()); - // Clear out the splunk event. - splunkEvent.clear(); + if (recordCount >= config.getPluginConfig().getWriterBatchSize()) { + try { + writeEvents(); + } catch (IOException e) { + throw UserException.dataWriteError(e) + .message("Error writing data to Splunk: " + e.getMessage()) + .build(logger); + } + + // Reset record count + recordCount = 0; + } } + + /* + args – Optional arguments for this stream. Valid parameters are: "host", "host_regex", "source", and "sourcetype". + */ @Override public void abort() { + logger.debug("Aborting writing records to Splunk."); // No op } @Override public void cleanup() { - // No op + try { + writeEvents(); + } catch (IOException e) { + throw UserException.dataWriteError(e) + .message("Error writing data to Splunk: " + e.getMessage()) + .build(logger); + } + } + + private void writeEvents() throws IOException { + // Open the socket and stream, set up a timestamp + destinationIndex.attachWith(new ReceiverBehavior() { Review Comment: This results in a dedicated TCP socket being opened and closed for every writer batch. ########## contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java: ########## @@ -98,27 +100,69 @@ public void updateSchema(VectorAccessible batch) { @Override public void startRecord() { logger.debug("Starting record"); - // Ensure that the new record is empty. This is not strictly necessary, but it is a belt and suspenders approach. - splunkEvent.clear(); + // Ensure that the new record is empty. + splunkEvent = new JSONObject(); } @Override - public void endRecord() throws IOException { + public void endRecord() { logger.debug("Ending record"); + recordCount++; + + // Put event in buffer + eventBuffer.add(splunkEvent); + // Write the event to the Splunk index - destinationIndex.submit(eventArgs, splunkEvent.toJSONString()); - // Clear out the splunk event. - splunkEvent.clear(); + if (recordCount >= config.getPluginConfig().getWriterBatchSize()) { + try { + writeEvents(); + } catch (IOException e) { + throw UserException.dataWriteError(e) + .message("Error writing data to Splunk: " + e.getMessage()) + .build(logger); + } + + // Reset record count + recordCount = 0; + } } + + /* + args – Optional arguments for this stream. Valid parameters are: "host", "host_regex", "source", and "sourcetype". + */ @Override public void abort() { + logger.debug("Aborting writing records to Splunk."); Review Comment: Would there be any use in clearing eventBuffer here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@drill.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org