[ 
https://issues.apache.org/jira/browse/DRILL-8371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17649374#comment-17649374
 ] 

ASF GitHub Bot commented on DRILL-8371:
---------------------------------------

jnturton commented on code in PR #2722:
URL: https://github.com/apache/drill/pull/2722#discussion_r1052337380


##########
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+
+import com.splunk.Args;
+import com.splunk.Index;
+import com.splunk.IndexCollection;
+import com.splunk.Service;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class SplunkBatchWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SplunkBatchWriter.class);
+  private static final String DEFAULT_SOURCETYPE = "drill";
+  private final UserCredentials userCredentials;
+  private final List<String> tableIdentifier;
+  private final SplunkWriter config;
+  private final Args eventArgs;
+  protected final Service splunkService;
+  private JSONObject splunkEvent;
+  protected Index destinationIndex;
+
+
+  public SplunkBatchWriter(UserCredentials userCredentials, List<String> 
tableIdentifier, SplunkWriter config) {
+    this.config = config;
+    this.tableIdentifier = tableIdentifier;
+    this.userCredentials = userCredentials;
+
+    SplunkConnection connection = new 
SplunkConnection(config.getPluginConfig(), userCredentials.getUserName());
+    this.splunkService = connection.connect();
+
+    // Populate event arguments
+    this.eventArgs = new Args();
+    eventArgs.put("sourcetype", DEFAULT_SOURCETYPE);
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) throws IOException {
+    // No op
+  }
+
+  /**
+   * Update the schema in RecordWriter. Called at least once before starting 
writing the records. In this case,
+   * we add the index to Splunk here. Splunk's API is a little sparse and 
doesn't really do much in the way
+   * of error checking or providing feedback if the operation fails.
+   *
+   * @param batch {@link VectorAccessible} The incoming batch
+   */
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    logger.debug("Updating schema for Splunk");
+
+    //Get the collection of indexes
+    IndexCollection indexes = splunkService.getIndexes();
+    try {
+      String indexName = tableIdentifier.get(0);
+      indexes.create(indexName);
+      destinationIndex = splunkService.getIndexes().get(indexName);
+    } catch (Exception e) {
+      // We have to catch a generic exception here, as Splunk's SDK does not 
really provide any kind of
+      // failure messaging.
+      throw UserException.systemError(e)
+        .message("Error creating new index in Splunk plugin: " + 
e.getMessage())
+        .build(logger);
+    }
+  }
+
+
+  @Override
+  public void startRecord() {
+    logger.debug("Starting record");
+    splunkEvent = new JSONObject();
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+    // Write the event to the Splunk index
+    destinationIndex.submit(eventArgs, splunkEvent.toJSONString());
+    // Clear out the splunk event.
+    splunkEvent = new JSONObject();
+  }
+
+  @Override
+  public void abort() {
+    // No op
+  }
+
+  @Override
+  public void cleanup() {
+    // No op
+  }
+
+
+  @Override
+  public FieldConverter getNewNullableIntConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewIntConverter(int fieldId, String fieldName, 
FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableBigIntConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewBigIntConverter(int fieldId, String fieldName, 
FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableSmallIntConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewSmallIntConverter(int fieldId, String fieldName, 
FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableTinyIntConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewTinyIntConverter(int fieldId, String fieldName, 
FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableFloat4Converter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewFloat4Converter(int fieldId, String fieldName, 
FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableFloat8Converter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewFloat8Converter(int fieldId, String fieldName, 
FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableVarDecimalConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewVarDecimalConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableDateConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewDateConverter(int fieldId, String fieldName, 
FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableTimeConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewTimeConverter(int fieldId, String fieldName, 
FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableVarCharConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new VarCharSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableTimeStampConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewTimeStampConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewVarCharConverter(int fieldId, String fieldName, 
FieldReader reader) {
+    return new VarCharSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewNullableBitConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewBitConverter(int fieldId, String fieldName, 
FieldReader reader) {
+    return new ScalarSplunkConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewMapConverter(int fieldId, String fieldName, 
FieldReader reader) {
+    return new ComplexFieldConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewUnionConverter(int fieldId, String fieldName, 
FieldReader reader) {
+    return new ComplexFieldConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewRepeatedMapConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ComplexFieldConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewRepeatedListConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ComplexFieldConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewDictConverter(int fieldId, String fieldName, 
FieldReader reader) {
+    return new ComplexFieldConverter(fieldId, fieldName, reader);
+  }
+
+  @Override
+  public FieldConverter getNewRepeatedDictConverter(int fieldId, String 
fieldName, FieldReader reader) {
+    return new ComplexFieldConverter(fieldId, fieldName, reader);
+  }
+
+  public class VarCharSplunkConverter extends FieldConverter {
+
+    public VarCharSplunkConverter(int fieldID, String fieldName, FieldReader 
reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      byte[] bytes = reader.readText().copyBytes();
+      splunkEvent.put(fieldName, new String(bytes));
+    }
+  }
+
+  public class ScalarSplunkConverter extends FieldConverter {
+    public ScalarSplunkConverter(int fieldID, String fieldName, FieldReader 
reader) {
+      super(fieldID, fieldName, reader);
+    }
+
+    @Override
+    public void writeField() {
+      splunkEvent.put(fieldName, String.valueOf(reader.readObject()));

Review Comment:
   Does Splunk have any numeric or temporal types? Or does it only work with 
strings?



##########
contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkWriterTest.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.MethodSorters;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@FixMethodOrder(MethodSorters.JVM)
+@Category({SlowTest.class})
+public class SplunkWriterTest extends SplunkBaseTest {
+
+  @Test
+  public void testBasicCTAS() throws Exception {
+
+    // Verify that there is no index called t1 in Splunk
+    String sql = "SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_SCHEMA 
= 'splunk' AND TABLE_NAME LIKE 't1'";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(0, results.rowCount());
+    results.clear();
+
+    // Now create the table
+    sql = "CREATE TABLE `splunk`.`t1` AS SELECT * FROM cp.`test_data.csvh`";
+    QuerySummary summary = client.queryBuilder().sql(sql).run();
+    assertTrue(summary.succeeded());
+
+    // Verify that an index was created called t1 in Splunk
+    sql = "SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_SCHEMA = 
'splunk' AND TABLE_NAME LIKE 't1'";
+    results = client.queryBuilder().sql(sql).rowSet();
+    assertEquals(1, results.rowCount());
+    results.clear();
+
+    // There seems to be some delay between the Drill query writing the data 
and the data being made
+    // accessible.
+    Thread.sleep(30000);

Review Comment:
   It's always unfortunate to be forced to put lines like this in a unit test. 
I'm trying to think of some way around it but I haven't got anything yet.



##########
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+
+import com.splunk.Args;
+import com.splunk.Index;
+import com.splunk.IndexCollection;
+import com.splunk.Service;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class SplunkBatchWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SplunkBatchWriter.class);
+  private static final String DEFAULT_SOURCETYPE = "drill";
+  private final UserCredentials userCredentials;
+  private final List<String> tableIdentifier;
+  private final SplunkWriter config;
+  private final Args eventArgs;
+  protected final Service splunkService;
+  private JSONObject splunkEvent;
+  protected Index destinationIndex;
+
+
+  public SplunkBatchWriter(UserCredentials userCredentials, List<String> 
tableIdentifier, SplunkWriter config) {
+    this.config = config;
+    this.tableIdentifier = tableIdentifier;
+    this.userCredentials = userCredentials;
+
+    SplunkConnection connection = new 
SplunkConnection(config.getPluginConfig(), userCredentials.getUserName());
+    this.splunkService = connection.connect();
+
+    // Populate event arguments
+    this.eventArgs = new Args();
+    eventArgs.put("sourcetype", DEFAULT_SOURCETYPE);
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) throws IOException {
+    // No op
+  }
+
+  /**
+   * Update the schema in RecordWriter. Called at least once before starting 
writing the records. In this case,

Review Comment:
   > Called at least once
   
   If this might be called more than once then might the init method not be a 
better place to put the index creation call?



##########
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+
+import com.splunk.Args;
+import com.splunk.Index;
+import com.splunk.IndexCollection;
+import com.splunk.Service;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class SplunkBatchWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SplunkBatchWriter.class);
+  private static final String DEFAULT_SOURCETYPE = "drill";
+  private final UserCredentials userCredentials;
+  private final List<String> tableIdentifier;
+  private final SplunkWriter config;
+  private final Args eventArgs;
+  protected final Service splunkService;
+  private JSONObject splunkEvent;
+  protected Index destinationIndex;
+
+
+  public SplunkBatchWriter(UserCredentials userCredentials, List<String> 
tableIdentifier, SplunkWriter config) {
+    this.config = config;
+    this.tableIdentifier = tableIdentifier;
+    this.userCredentials = userCredentials;
+
+    SplunkConnection connection = new 
SplunkConnection(config.getPluginConfig(), userCredentials.getUserName());
+    this.splunkService = connection.connect();
+
+    // Populate event arguments
+    this.eventArgs = new Args();
+    eventArgs.put("sourcetype", DEFAULT_SOURCETYPE);
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) throws IOException {
+    // No op
+  }
+
+  /**
+   * Update the schema in RecordWriter. Called at least once before starting 
writing the records. In this case,
+   * we add the index to Splunk here. Splunk's API is a little sparse and 
doesn't really do much in the way
+   * of error checking or providing feedback if the operation fails.
+   *
+   * @param batch {@link VectorAccessible} The incoming batch
+   */
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    logger.debug("Updating schema for Splunk");
+
+    //Get the collection of indexes
+    IndexCollection indexes = splunkService.getIndexes();
+    try {
+      String indexName = tableIdentifier.get(0);
+      indexes.create(indexName);
+      destinationIndex = splunkService.getIndexes().get(indexName);
+    } catch (Exception e) {
+      // We have to catch a generic exception here, as Splunk's SDK does not 
really provide any kind of
+      // failure messaging.
+      throw UserException.systemError(e)
+        .message("Error creating new index in Splunk plugin: " + 
e.getMessage())
+        .build(logger);
+    }
+  }
+
+
+  @Override
+  public void startRecord() {
+    logger.debug("Starting record");
+    splunkEvent = new JSONObject();
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+    // Write the event to the Splunk index
+    destinationIndex.submit(eventArgs, splunkEvent.toJSONString());

Review Comment:
   I found the following on [this Splunk doc 
page](https://dev.splunk.com/enterprise/docs/devtools/java/sdk-java/howtousesdkjava/howtogetdatasdkjava),
 can we take advantage of the 'attach' method that they suggest?
   
   > The submit method opens a new connection for each event, so if you have 
many events to send, this method may not perform well. In this case, use the 
attach method get an open socket to Splunk Enterprise that you can write your 
events to. Here is an example of sending an event (with a timestamp) to a 
socket opened by the attach method
   



##########
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+
+import com.splunk.Args;
+import com.splunk.Index;
+import com.splunk.IndexCollection;
+import com.splunk.Service;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class SplunkBatchWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SplunkBatchWriter.class);
+  private static final String DEFAULT_SOURCETYPE = "drill";
+  private final UserCredentials userCredentials;
+  private final List<String> tableIdentifier;
+  private final SplunkWriter config;
+  private final Args eventArgs;
+  protected final Service splunkService;
+  private JSONObject splunkEvent;
+  protected Index destinationIndex;
+
+
+  public SplunkBatchWriter(UserCredentials userCredentials, List<String> 
tableIdentifier, SplunkWriter config) {
+    this.config = config;
+    this.tableIdentifier = tableIdentifier;
+    this.userCredentials = userCredentials;
+
+    SplunkConnection connection = new 
SplunkConnection(config.getPluginConfig(), userCredentials.getUserName());
+    this.splunkService = connection.connect();
+
+    // Populate event arguments
+    this.eventArgs = new Args();
+    eventArgs.put("sourcetype", DEFAULT_SOURCETYPE);
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) throws IOException {
+    // No op
+  }
+
+  /**
+   * Update the schema in RecordWriter. Called at least once before starting 
writing the records. In this case,
+   * we add the index to Splunk here. Splunk's API is a little sparse and 
doesn't really do much in the way
+   * of error checking or providing feedback if the operation fails.
+   *
+   * @param batch {@link VectorAccessible} The incoming batch
+   */
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    logger.debug("Updating schema for Splunk");
+
+    //Get the collection of indexes
+    IndexCollection indexes = splunkService.getIndexes();
+    try {
+      String indexName = tableIdentifier.get(0);
+      indexes.create(indexName);
+      destinationIndex = splunkService.getIndexes().get(indexName);
+    } catch (Exception e) {
+      // We have to catch a generic exception here, as Splunk's SDK does not 
really provide any kind of
+      // failure messaging.
+      throw UserException.systemError(e)
+        .message("Error creating new index in Splunk plugin: " + 
e.getMessage())
+        .build(logger);
+    }
+  }
+
+
+  @Override
+  public void startRecord() {
+    logger.debug("Starting record");
+    splunkEvent = new JSONObject();
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+    // Write the event to the Splunk index
+    destinationIndex.submit(eventArgs, splunkEvent.toJSONString());
+    // Clear out the splunk event.
+    splunkEvent = new JSONObject();

Review Comment:
   Is this line required?





> Add Write/Append Capability to Splunk Plugin
> --------------------------------------------
>
>                 Key: DRILL-8371
>                 URL: https://issues.apache.org/jira/browse/DRILL-8371
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Storage - Splunk
>    Affects Versions: 1.20.2
>            Reporter: Charles Givre
>            Assignee: Charles Givre
>            Priority: Major
>             Fix For: 2.0.0
>
>
> While Drill can currently read from Splunk indexes, it cannot write to them 
> or create them.  This proposed PR adds support for CTAS queries for Splunk as 
> well as INSERT and DROP TABLE. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to