Refactored Elasticsearh components into their own class for better cohesion
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b5fd7e70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b5fd7e70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b5fd7e70 Branch: refs/heads/STREAMS-58 Commit: b5fd7e70aa81515c4278cbaec788fbf3f81b8d58 Parents: af1efe8 Author: mfranklin <[email protected]> Authored: Tue Apr 22 11:34:39 2014 -0400 Committer: mfranklin <[email protected]> Committed: Tue Apr 22 11:34:39 2014 -0400 ---------------------------------------------------------------------- .../ElasticsearchPersistReader.java | 308 ++++--------------- .../ElasticsearchPersistReaderTask.java | 63 ---- .../elasticsearch/ElasticsearchQuery.java | 282 +++++++++++++++++ 3 files changed, 339 insertions(+), 314 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b5fd7e70/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java index 72a9954..fd2a155 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java @@ -1,25 +1,17 @@ package org.apache.streams.elasticsearch; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Objects; -import com.google.common.collect.Lists; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Queues; -import com.typesafe.config.Config; -import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.*; import org.apache.streams.jackson.StreamsJacksonMapper; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.index.query.FilterBuilder; -import org.elasticsearch.index.query.FilterBuilders; -import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.sort.SortBuilders; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.Serializable; import java.math.BigInteger; import java.util.*; import java.util.concurrent.*; @@ -32,95 +24,23 @@ import java.util.concurrent.*; * ************************************************************************************************************ */ -public class ElasticsearchPersistReader implements StreamsPersistReader, Iterable<SearchHit>, Iterator<SearchHit> { +public class ElasticsearchPersistReader implements StreamsPersistReader, Serializable { public static final String STREAMS_ID = "ElasticsearchPersistReader"; private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReader.class); - private static final int SCROLL_POSITION_NOT_INITIALIZED = -3; - private static final Integer DEFAULT_BATCH_SIZE = 500; - private static final String DEFAULT_SCROLL_TIMEOUT = "5m"; protected volatile Queue<StreamsDatum> persistQueue; - private ElasticsearchClientManager elasticsearchClientManager; - private List<String> indexes = Lists.newArrayList(); - private List<String> types = Lists.newArrayList(); - private String[] withfields; - private String[] withoutfields; - private DateTime startDate; - private DateTime endDate; - private int limit = 1000 * 1000 * 1000; // we are going to set the default limit very high to 1bil - private boolean random = false; + private ElasticsearchQuery elasticsearchQuery; + private ElasticsearchReaderConfiguration config; private int threadPoolSize = 10; - private int batchSize = 100; - private String scrollTimeout = null; - - private ObjectMapper mapper; - - private ElasticsearchConfiguration config; - private ExecutorService executor; - private QueryBuilder queryBuilder; - private FilterBuilder filterBuilder; - - // These are private to help us manage the scroll - private SearchRequestBuilder search; - private SearchResponse scrollResp; - private int scrollPositionInScroll = SCROLL_POSITION_NOT_INITIALIZED; - private SearchHit next = null; - private long totalHits = 0; - private long totalRead = 0; - public ElasticsearchPersistReader() { - Config config = StreamsConfigurator.config.getConfig("elasticsearch"); - this.config = ElasticsearchConfigurator.detectConfiguration(config); - } - - public ElasticsearchPersistReader(ElasticsearchReaderConfiguration elasticsearchConfiguration) { - this.elasticsearchClientManager = new ElasticsearchClientManager(elasticsearchConfiguration); - indexes.addAll(elasticsearchConfiguration.getIndexes()); - types.addAll(elasticsearchConfiguration.getTypes()); - } - - public long getHitCount() { - return this.search == null ? 0 : this.totalHits; - } - - public long getReadCount() { - return this.totalRead; - } - - public double getReadPercent() { - return (double) this.getReadCount() / (double) this.getHitCount(); - } - - public long getRemainingCount() { - return this.totalRead - this.totalHits; - } - - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; } - public void setScrollTimeout(String scrollTimeout) { - this.scrollTimeout = scrollTimeout; - } - - public void setQueryBuilder(QueryBuilder queryBuilder) { - this.queryBuilder = queryBuilder; - } - - public void setFilterBuilder(FilterBuilder filterBuilder) { - this.filterBuilder = filterBuilder; - } - - public void setWithfields(String[] withfields) { - this.withfields = withfields; - } - - public void setWithoutfields(String[] withoutfields) { - this.withoutfields = withoutfields; + public ElasticsearchPersistReader(ElasticsearchReaderConfiguration config) { + this.config = config; } //PersistReader methods @@ -128,57 +48,14 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl public void startStream() { LOGGER.debug("startStream"); executor = Executors.newSingleThreadExecutor(); - executor.submit(new ElasticsearchPersistReaderTask(this)); + executor.submit(new ElasticsearchPersistReaderTask(this, elasticsearchQuery)); } @Override public void prepare(Object o) { - - mapper = StreamsJacksonMapper.getInstance(); - persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000)); - - // If we haven't already set up the search, then set up the search. - if (search == null) { - search = elasticsearchClientManager.getClient() - .prepareSearch(indexes.toArray(new String[0])) - .setSearchType(SearchType.SCAN) - .setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue()) - .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT)); - - if (this.queryBuilder != null) - search.setQuery(this.queryBuilder); - - // If the types are null, then don't specify a type - if (this.types != null && this.types.size() > 0) - search = search.setTypes(types.toArray(new String[0])); - - Integer clauses = 0; - if (this.withfields != null || this.withoutfields != null) { - if (this.withfields != null) - clauses += this.withfields.length; - if (this.withoutfields != null) - clauses += this.withoutfields.length; - } - - List<FilterBuilder> filterList = buildFilterList(); - - FilterBuilder allFilters = andFilters(filterList); - - if (clauses > 0) { - // search.setPostFilter(allFilters); - search.setPostFilter(allFilters); - } - - // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll. - if (this.random) - search = search.addSort(SortBuilders.scriptSort("random()", "number")); - } - - // We don't have a scroll, we need to create a scroll - if (scrollResp == null) { - scrollResp = search.execute().actionGet(); - LOGGER.trace(search.toString()); - } + elasticsearchQuery = this.config == null ? new ElasticsearchQuery() : new ElasticsearchQuery(config); + elasticsearchQuery.execute(o); + persistQueue = constructQueue(); } @Override @@ -192,12 +69,12 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl StreamsResultSet current; synchronized (ElasticsearchPersistReader.class) { - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); + current = new StreamsResultSet(persistQueue); current.setCounter(new DatumStatusCounter()); // current.getCounter().add(countersCurrent); // countersTotal.add(countersCurrent); // countersCurrent = new DatumStatusCounter(); - persistQueue.clear(); + persistQueue = constructQueue(); } return current; @@ -218,69 +95,20 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl @Override public void cleanUp() { + this.shutdownAndAwaitTermination(executor); LOGGER.info("PersistReader done"); } - //Iterable methods - @Override - public Iterator<SearchHit> iterator() { - return this; - } - - //Iterator methods - @Override - public SearchHit next() { - return this.next; - } - - @Override - public boolean hasNext() { - calcNext(); - return hasRecords(); - } - - - public void calcNext() { - try { - // We have exhausted our scroll create another scroll. - if (scrollPositionInScroll == SCROLL_POSITION_NOT_INITIALIZED || scrollPositionInScroll >= scrollResp.getHits().getHits().length) { - // reset the scroll position - scrollPositionInScroll = 0; - - // get the next hits of the scroll - scrollResp = elasticsearchClientManager.getClient() - .prepareSearchScroll(scrollResp.getScrollId()) - .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT)) - .execute() - .actionGet(); - - this.totalHits = scrollResp.getHits().getTotalHits(); - } - - // If this scroll has 0 items then we set the scroll position to -1 - // letting the iterator know that we are done. - if (scrollResp.getHits().getTotalHits() == 0 || scrollResp.getHits().getHits().length == 0) - scrollPositionInScroll = -1; - else { - // get the next record - next = scrollResp.getHits().getAt(scrollPositionInScroll); - - // Increment our counters - scrollPositionInScroll += 1; - totalRead += 1; - } - } catch (Exception e) { - e.printStackTrace(); - LOGGER.error("Unexpected scrolling error: {}", e.getMessage()); - scrollPositionInScroll = -1; - next = null; + protected void write(StreamsDatum entry) { + boolean success; + do { + success = persistQueue.offer(entry); + Thread.yield(); } + while (!success); } - public void remove() { - } - - void shutdownAndAwaitTermination(ExecutorService pool) { + protected void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate @@ -288,7 +116,7 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); + LOGGER.error("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted @@ -298,72 +126,50 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl } } - private boolean isCompleted() { - return totalRead >= this.limit && hasRecords(); - } - - private boolean hasRecords() { - return scrollPositionInScroll != -1 && (!(this.totalRead > this.limit)); - } - - // copied from elasticsearch - // if we need this again we should factor it out into a utility - private FilterBuilder andFilters(List<FilterBuilder> filters) { - if (filters == null || filters.size() == 0) - return null; - - FilterBuilder toReturn = filters.get(0); - - for (int i = 1; i < filters.size(); i++) - toReturn = FilterBuilders.andFilter(toReturn, filters.get(i)); - - return toReturn; + private Queue<StreamsDatum> constructQueue() { + return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000)); } - private FilterBuilder orFilters(List<FilterBuilder> filters) { - if (filters == null || filters.size() == 0) - return null; + public static class ElasticsearchPersistReaderTask implements Runnable { - FilterBuilder toReturn = filters.get(0); + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReaderTask.class); - for (int i = 1; i < filters.size(); i++) - toReturn = FilterBuilders.orFilter(toReturn, filters.get(i)); + private ElasticsearchPersistReader reader; + private ElasticsearchQuery query; + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - return toReturn; - } - - private List<FilterBuilder> buildFilterList() { - - ArrayList<FilterBuilder> filterList = Lists.newArrayList(); + public ElasticsearchPersistReaderTask(ElasticsearchPersistReader reader, ElasticsearchQuery query) { + this.reader = reader; + this.query = query; + } - // If any withfields are specified, require that field be present - // There must a value set also for the document to be processed - if (this.withfields != null && this.withfields.length > 0) { - ArrayList<FilterBuilder> withFilterList = Lists.newArrayList(); - for (String withfield : this.withfields) { - FilterBuilder withFilter = FilterBuilders.existsFilter(withfield); - withFilterList.add(withFilter); + @Override + public void run() { + + StreamsDatum item; + while (query.hasNext()) { + SearchHit hit = query.next(); + ObjectNode jsonObject = null; + try { + jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class); + } catch (IOException e) { + e.printStackTrace(); + break; + } + item = new StreamsDatum(jsonObject, hit.getId()); + item.getMetadata().put("id", hit.getId()); + item.getMetadata().put("index", hit.getIndex()); + item.getMetadata().put("type", hit.getType()); + reader.write(item); } - //filterList.add(FilterBuilders.orFilter(orFilters(withFilterList))); - filterList.add(withFilterList.get(0)); - } - // If any withoutfields are specified, require that field not be present - // Document will be picked up even if present, if they do not have at least one value - // this is annoying as it majorly impacts runtime - // might be able to change behavior using null_field - if (this.withoutfields != null && this.withoutfields.length > 0) { - ArrayList<FilterBuilder> withoutFilterList = Lists.newArrayList(); - for (String withoutfield : this.withoutfields) { - FilterBuilder withoutFilter = FilterBuilders.missingFilter(withoutfield).existence(true).nullValue(false); - withoutFilterList.add(withoutFilter); + try { + Thread.sleep(new Random().nextInt(100)); + } catch (InterruptedException e) { + LOGGER.warn("Thread interrupted", e); } - //filterList.add(FilterBuilders.orFilter(orFilters(withoutFilterList))); - filterList.add(withoutFilterList.get(0)); - } - return filterList; + } } - } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b5fd7e70/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java deleted file mode 100644 index 2d9c951..0000000 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java +++ /dev/null @@ -1,63 +0,0 @@ -package org.apache.streams.elasticsearch; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.ComponentUtils; -import org.elasticsearch.search.SearchHit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Queue; -import java.util.Random; - -public class ElasticsearchPersistReaderTask implements Runnable { - - private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReaderTask.class); - - private ElasticsearchPersistReader reader; - - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - public ElasticsearchPersistReaderTask(ElasticsearchPersistReader reader) { - this.reader = reader; - } - - @Override - public void run() { - - StreamsDatum item; - while( reader.hasNext()) { - SearchHit hit = reader.next(); - ObjectNode jsonObject = null; - try { - jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class); - } catch (IOException e) { - e.printStackTrace(); - break; - } - item = new StreamsDatum(jsonObject, hit.getId()); - item.getMetadata().put("id", hit.getId()); - item.getMetadata().put("index", hit.getIndex()); - item.getMetadata().put("type", hit.getType()); - write(item); - } - try { - Thread.sleep(new Random().nextInt(100)); - } catch (InterruptedException e) {} - - } - - private void write( StreamsDatum entry ) { - boolean success; - do { - synchronized( ElasticsearchPersistReader.class ) { - success = reader.persistQueue.offer(entry); - } - Thread.yield(); - } - while( !success ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b5fd7e70/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java new file mode 100644 index 0000000..8c9abda --- /dev/null +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java @@ -0,0 +1,282 @@ +package org.apache.streams.elasticsearch; + +import com.google.common.collect.Lists; +import com.google.common.base.Objects; +import com.typesafe.config.Config; +import org.apache.streams.config.StreamsConfigurator; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.index.query.FilterBuilder; +import org.elasticsearch.index.query.FilterBuilders; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortBuilders; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchHit>, Serializable { + + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchQuery.class); + private static final int SCROLL_POSITION_NOT_INITIALIZED = -3; + private static final Integer DEFAULT_BATCH_SIZE = 500; + private static final String DEFAULT_SCROLL_TIMEOUT = "5m"; + + private ElasticsearchClientManager elasticsearchClientManager; + private ElasticsearchConfiguration config; + private List<String> indexes = Lists.newArrayList(); + private List<String> types = Lists.newArrayList(); + private String[] withfields; + private String[] withoutfields; + private DateTime startDate; + private DateTime endDate; + private int limit = 1000 * 1000 * 1000; // we are going to set the default limit very high to 1bil + private boolean random = false; + private int batchSize = 100; + private String scrollTimeout = null; + private org.elasticsearch.index.query.QueryBuilder queryBuilder; + private org.elasticsearch.index.query.FilterBuilder filterBuilder;// These are private to help us manage the scroll + private SearchRequestBuilder search; + private SearchResponse scrollResp; + private int scrollPositionInScroll = SCROLL_POSITION_NOT_INITIALIZED; + private SearchHit next = null; + private long totalHits = 0; + private long totalRead = 0; + + public ElasticsearchQuery() { + Config config = StreamsConfigurator.config.getConfig("elasticsearch"); + this.config = ElasticsearchConfigurator.detectConfiguration(config); + } + + public ElasticsearchQuery(ElasticsearchReaderConfiguration config) { + this.config = config; + this.elasticsearchClientManager = new ElasticsearchClientManager(config); + this.indexes.addAll(config.getIndexes()); + this.types.addAll(config.getTypes()); + } + + public long getHitCount() { + return this.search == null ? 0 : this.totalHits; + } + + public long getReadCount() { + return this.totalRead; + } + + public double getReadPercent() { + return (double) this.getReadCount() / (double) this.getHitCount(); + } + + public long getRemainingCount() { + return this.totalRead - this.totalHits; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + public void setScrollTimeout(String scrollTimeout) { + this.scrollTimeout = scrollTimeout; + } + + public void setQueryBuilder(QueryBuilder queryBuilder) { + this.queryBuilder = queryBuilder; + } + + public void setFilterBuilder(FilterBuilder filterBuilder) { + this.filterBuilder = filterBuilder; + } + + public void setWithfields(String[] withfields) { + this.withfields = withfields; + } + + public void setWithoutfields(String[] withoutfields) { + this.withoutfields = withoutfields; + } + + public void execute(Object o) { + + // If we haven't already set up the search, then set up the search. + if (search == null) { + search = elasticsearchClientManager.getClient() + .prepareSearch(indexes.toArray(new String[0])) + .setSearchType(SearchType.SCAN) + .setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue()) + .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT)); + + if (this.queryBuilder != null) + search.setQuery(this.queryBuilder); + + // If the types are null, then don't specify a type + if (this.types != null && this.types.size() > 0) + search = search.setTypes(types.toArray(new String[0])); + + Integer clauses = 0; + if (this.withfields != null || this.withoutfields != null) { + if (this.withfields != null) + clauses += this.withfields.length; + if (this.withoutfields != null) + clauses += this.withoutfields.length; + } + + List<FilterBuilder> filterList = buildFilterList(); + + FilterBuilder allFilters = andFilters(filterList); + + if (clauses > 0) { + // search.setPostFilter(allFilters); + search.setPostFilter(allFilters); + } + + // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll. + if (this.random) + search = search.addSort(SortBuilders.scriptSort("random()", "number")); + } + + // We don't have a scroll, we need to create a scroll + if (scrollResp == null) { + scrollResp = search.execute().actionGet(); + LOGGER.trace(search.toString()); + } + } + + //Iterable methods + @Override + public Iterator<SearchHit> iterator() { + return this; + } + + //Iterator methods + @Override + public SearchHit next() { + return this.next; + } + + @Override + public boolean hasNext() { + calcNext(); + return hasRecords(); + } + + public void calcNext() { + try { + // We have exhausted our scroll create another scroll. + if (scrollPositionInScroll == SCROLL_POSITION_NOT_INITIALIZED || scrollPositionInScroll >= scrollResp.getHits().getHits().length) { + // reset the scroll position + scrollPositionInScroll = 0; + + // get the next hits of the scroll + scrollResp = elasticsearchClientManager.getClient() + .prepareSearchScroll(scrollResp.getScrollId()) + .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT)) + .execute() + .actionGet(); + + this.totalHits = scrollResp.getHits().getTotalHits(); + } + + // If this scroll has 0 items then we set the scroll position to -1 + // letting the iterator know that we are done. + if (scrollResp.getHits().getTotalHits() == 0 || scrollResp.getHits().getHits().length == 0) + scrollPositionInScroll = -1; + else { + // get the next record + next = scrollResp.getHits().getAt(scrollPositionInScroll); + + // Increment our counters + scrollPositionInScroll += 1; + totalRead += 1; + } + } catch (Exception e) { + e.printStackTrace(); + LOGGER.error("Unexpected scrolling error: {}", e.getMessage()); + scrollPositionInScroll = -1; + next = null; + } + } + + public void remove() { + } + + protected boolean isCompleted() { + return totalRead >= this.limit && hasRecords(); + } + + protected boolean hasRecords() { + return scrollPositionInScroll != -1 && (!(this.totalRead > this.limit)); + } + + // copied from elasticsearch + // if we need this again we should factor it out into a utility + private FilterBuilder andFilters(List<FilterBuilder> filters) { + if (filters == null || filters.size() == 0) + return null; + + FilterBuilder toReturn = filters.get(0); + + for (int i = 1; i < filters.size(); i++) + toReturn = FilterBuilders.andFilter(toReturn, filters.get(i)); + + return toReturn; + } + + private FilterBuilder orFilters(List<FilterBuilder> filters) { + if (filters == null || filters.size() == 0) + return null; + + FilterBuilder toReturn = filters.get(0); + + for (int i = 1; i < filters.size(); i++) + toReturn = FilterBuilders.orFilter(toReturn, filters.get(i)); + + return toReturn; + } + + private List<FilterBuilder> buildFilterList() { + + // If any withfields are specified, require that field be present + // There must a value set also for the document to be processed + // If any withoutfields are specified, require that field not be present + // Document will be picked up even if present, if they do not have at least one value + // this is annoying as it majorly impacts runtime + // might be able to change behavior using null_field + + ArrayList<FilterBuilder> filterList = Lists.newArrayList(); + + // If any withfields are specified, require that field be present + // There must a value set also for the document to be processed + if (this.withfields != null && this.withfields.length > 0) { + ArrayList<FilterBuilder> withFilterList = Lists.newArrayList(); + for (String withfield : this.withfields) { + FilterBuilder withFilter = FilterBuilders.existsFilter(withfield); + withFilterList.add(withFilter); + } + //filterList.add(FilterBuilders.orFilter(orFilters(withFilterList))); + filterList.add(withFilterList.get(0)); + } + // If any withoutfields are specified, require that field not be present + // Document will be picked up even if present, if they do not have at least one value + // this is annoying as it majorly impacts runtime + // might be able to change behavior using null_field + if (this.withoutfields != null && this.withoutfields.length > 0) { + ArrayList<FilterBuilder> withoutFilterList = Lists.newArrayList(); + for (String withoutfield : this.withoutfields) { + FilterBuilder withoutFilter = FilterBuilders.missingFilter(withoutfield).existence(true).nullValue(false); + withoutFilterList.add(withoutFilter); + } + //filterList.add(FilterBuilders.orFilter(orFilters(withoutFilterList))); + filterList.add(withoutFilterList.get(0)); + } + + return filterList; + } +} \ No newline at end of file
