[ https://issues.apache.org/jira/browse/DRILL-8371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17650079#comment-17650079 ]
ASF GitHub Bot commented on DRILL-8371: --------------------------------------- cgivre commented on code in PR #2722: URL: https://github.com/apache/drill/pull/2722#discussion_r1053981884 ########## contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java: ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.splunk; + + +import com.splunk.Args; +import com.splunk.Index; +import com.splunk.IndexCollection; +import com.splunk.Service; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.proto.UserBitShared.UserCredentials; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.store.AbstractRecordWriter; +import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; +import org.apache.drill.exec.vector.complex.reader.FieldReader; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class SplunkBatchWriter extends AbstractRecordWriter { + + private static final Logger logger = LoggerFactory.getLogger(SplunkBatchWriter.class); + private static final String DEFAULT_SOURCETYPE = "drill"; + private final UserCredentials userCredentials; + private final List<String> tableIdentifier; + private final SplunkWriter config; + private final Args eventArgs; + protected final Service splunkService; + private final JSONObject splunkEvent; + protected Index destinationIndex; + + + public SplunkBatchWriter(UserCredentials userCredentials, List<String> tableIdentifier, SplunkWriter config) { + this.config = config; + this.tableIdentifier = tableIdentifier; + this.userCredentials = userCredentials; + this.splunkEvent = new JSONObject(); + SplunkConnection connection = new SplunkConnection(config.getPluginConfig(), userCredentials.getUserName()); + this.splunkService = connection.connect(); + + // Populate event arguments + this.eventArgs = new Args(); + eventArgs.put("sourcetype", DEFAULT_SOURCETYPE); + } + + @Override + public void init(Map<String, String> writerOptions) throws IOException { + // No op + } + + /** + * Update the schema in RecordWriter. Called before starting writing the records. In this case, + * we add the index to Splunk here. Splunk's API is a little sparse and doesn't really do much in the way + * of error checking or providing feedback if the operation fails. + * + * @param batch {@link VectorAccessible} The incoming batch + */ + @Override + public void updateSchema(VectorAccessible batch) { + logger.debug("Updating schema for Splunk"); + + //Get the collection of indexes + IndexCollection indexes = splunkService.getIndexes(); + try { + String indexName = tableIdentifier.get(0); + indexes.create(indexName); + destinationIndex = splunkService.getIndexes().get(indexName); + } catch (Exception e) { + // We have to catch a generic exception here, as Splunk's SDK does not really provide any kind of + // failure messaging. + throw UserException.systemError(e) + .message("Error creating new index in Splunk plugin: " + e.getMessage()) + .build(logger); + } + } + + + @Override + public void startRecord() { + logger.debug("Starting record"); + // Ensure that the new record is empty. This is not strictly necessary, but it is a belt and suspenders approach. + splunkEvent.clear(); Review Comment: I removed this from the `endRecord` method. > Add Write/Append Capability to Splunk Plugin > -------------------------------------------- > > Key: DRILL-8371 > URL: https://issues.apache.org/jira/browse/DRILL-8371 > Project: Apache Drill > Issue Type: Improvement > Components: Storage - Splunk > Affects Versions: 1.20.2 > Reporter: Charles Givre > Assignee: Charles Givre > Priority: Major > Fix For: 2.0.0 > > > While Drill can currently read from Splunk indexes, it cannot write to them > or create them. This proposed PR adds support for CTAS queries for Splunk as > well as INSERT and DROP TABLE. -- This message was sent by Atlassian Jira (v8.20.10#820010)