Github user mmiklavc commented on a diff in the pull request: https://github.com/apache/metron/pull/1254#discussion_r236530989 --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java --- @@ -56,90 +58,107 @@ */ private transient ElasticsearchClient client; + /** + * Responsible for writing documents. + * + * <p>Uses a {@link TupleBasedDocument} to maintain the relationship between + * a {@link Tuple} and the document created from the contents of that tuple. If + * a document cannot be written, the associated tuple needs to be failed. + */ + private transient BulkDocumentWriter<TupleBasedDocument> documentWriter; + /** * A simple data formatter used to build the appropriate Elasticsearch index name. */ private SimpleDateFormat dateFormat; - @Override public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) { - Map<String, Object> globalConfiguration = configurations.getGlobalConfig(); - client = ElasticsearchClientFactory.create(globalConfiguration); dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration); + + // only create the document writer, if one does not already exist. useful for testing. + if(documentWriter == null) { + client = ElasticsearchClientFactory.create(globalConfiguration); + documentWriter = new ElasticsearchBulkDocumentWriter<>(client); + } } @Override - public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception { + public BulkWriterResponse write(String sensorType, + WriterConfiguration configurations, + Iterable<Tuple> tuplesIter, + List<JSONObject> messages) { // fetch the field name converter for this sensor type FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations); + String indexPostfix = dateFormat.format(new Date()); + String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations); + + // the number of tuples must match the number of messages + List<Tuple> tuples = Lists.newArrayList(tuplesIter); + int batchSize = tuples.size(); + if(messages.size() != batchSize) { + throw new IllegalStateException(format("Expect same number of tuples and messages; |tuples|=%d, |messages|=%d", + tuples.size(), messages.size())); + } - final String indexPostfix = dateFormat.format(new Date()); - BulkRequest bulkRequest = new BulkRequest(); - for(JSONObject message: messages) { + // create a document from each message + List<TupleBasedDocument> documents = new ArrayList<>(); + for(int i=0; i<tuples.size(); i++) { + JSONObject message = messages.get(i); + Tuple tuple = tuples.get(i); - JSONObject esDoc = new JSONObject(); + // transform the message fields to the source fields of the indexed document + JSONObject source = new JSONObject(); for(Object k : message.keySet()){ - copyField(k.toString(), message, esDoc, fieldNameConverter); + copyField(k.toString(), message, source, fieldNameConverter); } - String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations); - IndexRequest indexRequest = new IndexRequest(indexName, sensorType + "_doc"); - indexRequest.source(esDoc.toJSONString()); - String guid = (String)esDoc.get(Constants.GUID); - if(guid != null) { - indexRequest.id(guid); + // define the document id + String guid = String.class.cast(source.get(Constants.GUID)); + if(guid == null) { + LOG.info("Missing '{}' field; document ID will be auto-generated.", Constants.GUID); } - Object ts = esDoc.get("timestamp"); - if(ts != null) { - indexRequest.timestamp(ts.toString()); + // define the document timestamp + Long timestamp = Long.class.cast(source.get(Constants.Fields.TIMESTAMP.getName())); + if(timestamp == null) { + LOG.info("Missing '{}' field; timestamp will be set to system time.", Constants.Fields.TIMESTAMP.getName()); } - bulkRequest.add(indexRequest); + + TupleBasedDocument document = new TupleBasedDocument(source, guid, sensorType, timestamp, tuple); + documentWriter.addDocument(document, indexName); } - BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest); - return buildWriteReponse(tuples, bulkResponse); + // add successful tuples to the response + BulkWriterResponse response = new BulkWriterResponse(); + documentWriter.onSuccess(docs -> { + List<Tuple> successfulTuples = docs.stream().map(doc -> doc.getTuple()).collect(Collectors.toList()); --- End diff -- Same comment on streams as above.
---