merging fixes and addressing serialization problems to get twitter streams-examples working.
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7293594e Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7293594e Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7293594e Branch: refs/heads/master Commit: 7293594eac98535e4b2206dca68da9b73e6cdf24 Parents: ae27541 522096e Author: sblackmon <[email protected]> Authored: Thu May 8 12:56:36 2014 -0500 Committer: sblackmon <[email protected]> Committed: Thu May 8 12:56:36 2014 -0500 ---------------------------------------------------------------------- .../streams-persist-elasticsearch/pom.xml | 1 - .../ElasticsearchConfigurator.java | 7 ++ .../ElasticsearchPersistWriter.java | 80 +++++++++++++++++++- .../ElasticsearchWriterConfiguration.json | 10 +++ .../twitter/processor/TwitterTypeConverter.java | 2 +- .../twitter/provider/TwitterStreamProvider.java | 68 ++++++++--------- .../provider/TwitterTimelineProvider.java | 50 ++++++++---- .../local/tasks/StreamsProviderTask.java | 2 +- 8 files changed, 161 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7293594e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7293594e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --cc streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index ff7e0f5,405d1b8..80d2775 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@@ -59,96 -50,68 +59,109 @@@ public class ElasticsearchPersistWrite private String parentID = null; private BulkRequestBuilder bulkRequest; private OutputStreamWriter currentWriter = null; - private int batchSize = 50; - private int totalRecordsWritten = 0; - private boolean veryLargeBulk = false; // by default this setting is set to false + private long batchSize; + private boolean veryLargeBulk; // by default this setting is set to false ++ private int totalRecordsWritten = 0; ++ + protected Thread task; - private final static Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l; - private static final long WAITING_DOCS_LIMIT = 10000; - - public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD; + protected volatile Queue<StreamsDatum> persistQueue; + private volatile int currentItems = 0; private volatile int totalSent = 0; private volatile int totalSeconds = 0; private volatile int totalAttempted = 0; private volatile int totalOk = 0; private volatile int totalFailed = 0; private volatile int totalBatchCount = 0; + private volatile int totalRecordsWritten = 0; private volatile long totalSizeInBytes = 0; - private volatile long batchSizeInBytes = 0; private volatile int batchItemsSent = 0; + private volatile int totalByteCount = 0; + private volatile int byteCount = 0; + + public ElasticsearchPersistWriter() { + Config config = StreamsConfigurator.config.getConfig("elasticsearch"); + this.config = (ElasticsearchWriterConfiguration) ElasticsearchConfigurator.detectConfiguration(config); + } + + public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) { + this.config = config; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + public void setVeryLargeBulk(boolean veryLargeBulk) { + this.veryLargeBulk = veryLargeBulk; + } + private final List<String> affectedIndexes = new ArrayList<String>(); - public int getTotalOutstanding() { return this.totalSent - (this.totalFailed + this.totalOk); } - public long getFlushThresholdSizeInBytes() { return flushThresholdSizeInBytes; } - public int getTotalSent() { return totalSent; } - public int getTotalSeconds() { return totalSeconds; } - public int getTotalOk() { return totalOk; } - public int getTotalFailed() { return totalFailed; } - public int getTotalBatchCount() { return totalBatchCount; } - public long getTotalSizeInBytes() { return totalSizeInBytes; } - public long getBatchSizeInBytes() { return batchSizeInBytes; } - public int getBatchItemsSent() { return batchItemsSent; } - public List<String> getAffectedIndexes() { return this.affectedIndexes; } + public int getTotalOutstanding() { + return this.totalSent - (this.totalFailed + this.totalOk); + } - public void setFlushThresholdSizeInBytes(long sizeInBytes) { this.flushThresholdSizeInBytes = sizeInBytes; } + public long getFlushThresholdSizeInBytes() { + return flushThresholdSizeInBytes; + } - Thread task; + public int getTotalSent() { + return totalSent; + } - protected volatile Queue<StreamsDatum> persistQueue; + public int getTotalSeconds() { + return totalSeconds; + } - private ObjectMapper mapper; + public int getTotalOk() { + return totalOk; + } ++ ++ private ObjectMapper mapper = new StreamsJacksonMapper(); - private ElasticsearchWriterConfiguration config; + public int getTotalFailed() { + return totalFailed; + } - public ElasticsearchPersistWriter() { - Config config = StreamsConfigurator.config.getConfig("elasticsearch"); - this.config = (ElasticsearchWriterConfiguration) ElasticsearchConfigurator.detectConfiguration(config); + public int getTotalBatchCount() { + return totalBatchCount; } - public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) { - this.config = config; + public long getTotalSizeInBytes() { + return totalSizeInBytes; + } + + public long getBatchSizeInBytes() { + return batchSizeInBytes; + } + + public int getBatchItemsSent() { + return batchItemsSent; + } + + public List<String> getAffectedIndexes() { + return this.affectedIndexes; + } + + public void setFlushThresholdSizeInBytes(long sizeInBytes) { + this.flushThresholdSizeInBytes = sizeInBytes; } + public boolean isConnected() { + return (client != null); + } + ++ private ElasticsearchWriterConfiguration config; ++ + private static final int BYTES_IN_MB = 1024*1024; + private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB; + private volatile int totalByteCount = 0; + private volatile int byteCount = 0; - - public boolean isConnected() { return (client != null); } - ++ @Override public void write(StreamsDatum streamsDatum) { @@@ -280,10 -238,13 +293,11 @@@ // reset the current batch statistics this.batchSizeInBytes = 0; this.batchItemsSent = 0; + this.currentItems = 0; - try - { + try { int count = 0; - if(this.getTotalOutstanding() > WAITING_DOCS_LIMIT) - { + if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) { /**************************************************************************** * Author: * Smashew @@@ -381,8 -402,60 +395,60 @@@ } } + private void trackItemAndBytesWritten(long sizeInBytes) + { + currentItems++; + batchItemsSent++; + batchSizeInBytes += sizeInBytes; + + // If our queue is larger than our flush threashold, then we should flush the queue. + if( (batchSizeInBytes > flushThresholdSizeInBytes) || + (currentItems >= batchSize) ) + flushInternal(); + } + + private void checkAndCreateBulkRequest() + { + // Synchronize to ensure that we don't lose any records + synchronized (this) + { + if(bulkRequest == null) + bulkRequest = this.manager.getClient().prepareBulk(); + } + } + + private void checkIndexImplications(String indexName) + { + + // check to see if we have seen this index before. + if(this.affectedIndexes.contains(indexName)) + return; + + // we haven't log this index. + this.affectedIndexes.add(indexName); + + // Check to see if we are in 'veryLargeBulk' mode + // if we aren't, exit early + if(!this.veryLargeBulk) + return; + + + // They are in 'very large bulk' mode we want to turn off refreshing the index. + + // Create a request then add the setting to tell it to stop refreshing the interval + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName); + updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1)); + + // submit to ElasticSearch + this.manager.getClient() + .admin() + .indices() + .updateSettings(updateSettingsRequest) + .actionGet(); + } + public void createIndexIfMissing(String indexName) { - if(!this.manager.getClient() + if (!this.manager.getClient() .admin() .indices() .exists(new IndicesExistsRequest(indexName)) @@@ -446,91 -524,20 +512,99 @@@ return toReturn; } + @Override + public void prepare(Object configurationObject) { + mapper = StreamsJacksonMapper.getInstance(); + veryLargeBulk = this.config.getBulk(); + batchSize = this.config.getBatchSize(); + start(); + } ++ + private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) { + bulkRequest.execute().addListener(new ActionListener<BulkResponse>() { + @Override + public void onResponse(BulkResponse bulkItemResponses) { + if (bulkItemResponses.hasFailures()) + LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage()); - @Override - public DatumStatusCounter getDatumStatusCounter() { - DatumStatusCounter counters = new DatumStatusCounter(); - counters.incrementAttempt(this.batchItemsSent); - counters.incrementStatus(DatumStatus.SUCCESS, this.totalOk); - counters.incrementStatus(DatumStatus.FAIL, this.totalFailed); - return counters; + long thisFailed = 0; + long thisOk = 0; + long thisMillis = bulkItemResponses.getTookInMillis(); + + // keep track of the number of totalFailed and items that we have totalOk. + for (BulkItemResponse resp : bulkItemResponses.getItems()) { + if (resp.isFailed()) + thisFailed++; + else + thisOk++; + } + + totalAttempted += thisSent; + totalOk += thisOk; + totalFailed += thisFailed; + totalSeconds += (thisMillis / 1000); + + if (thisSent != (thisOk + thisFailed)) + LOGGER.error("We sent more items than this"); + + LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]", + MEGABYTE_FORMAT.format((double) thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis), + MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding())); + } + + @Override + public void onFailure(Throwable e) { + LOGGER.error("Error bulk loading: {}", e.getMessage()); + e.printStackTrace(); + } + }); + + this.notify(); + } + + private void trackItemAndBytesWritten(long sizeInBytes) { + batchItemsSent++; + batchSizeInBytes += sizeInBytes; + + // If our queue is larger than our flush threashold, then we should flush the queue. + if (batchSizeInBytes > flushThresholdSizeInBytes) + flushInternal(); + } + + private void checkAndCreateBulkRequest() { + // Synchronize to ensure that we don't lose any records + synchronized (this) { + if (bulkRequest == null) + bulkRequest = this.manager.getClient().prepareBulk(); + } + } + + private void checkIndexImplications(String indexName) { + + // check to see if we have seen this index before. + if (this.affectedIndexes.contains(indexName)) + return; + + // we haven't log this index. + this.affectedIndexes.add(indexName); + + // Check to see if we are in 'veryLargeBulk' mode + // if we aren't, exit early + if (!this.veryLargeBulk) + return; + + + // They are in 'very large bulk' mode we want to turn off refreshing the index. + + // Create a request then add the setting to tell it to stop refreshing the interval + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName); + updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1)); + + // submit to ElasticSearch + this.manager.getClient() + .admin() + .indices() + .updateSettings(updateSettingsRequest) + .actionGet(); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7293594e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---------------------------------------------------------------------- diff --cc streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java index b456fa4,20ee951..db1ec76 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java @@@ -1,7 -1,8 +1,9 @@@ package org.apache.streams.twitter.provider; + import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.Queues; + import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; @@@ -20,11 -21,10 +23,13 @@@ import twitter4j.json.DataObjectFactory import java.io.Serializable; import java.math.BigInteger; - import java.util.*; + import java.util.Iterator; + import java.util.List; + import java.util.Queue; -import java.util.concurrent.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Created by sblackmon on 12/10/13. @@@ -47,16 -47,22 +52,20 @@@ public class TwitterTimelineProvider im this.config = config; } - protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); + protected final Queue<StreamsDatum> providerQueue = Queues.synchronizedQueue(new ArrayBlockingQueue<StreamsDatum>(500)); + protected int idsCount; + protected Twitter client; protected Iterator<Long> ids; - ListenableFuture providerTaskComplete; -// -// public BlockingQueue<Object> getInQueue() { -// return inQueue; -// } - protected ListeningExecutorService executor; protected DateTime start; protected DateTime end; ++ Boolean jsonStoreEnabled; ++ Boolean includeEntitiesEnabled; ++ private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { return new ThreadPoolExecutor(nThreads, nThreads, 5000L, TimeUnit.MILLISECONDS, @@@ -88,12 -93,36 +96,13 @@@ return this.providerQueue; } -// public void run() { -// -// LOGGER.info("{} Running", STREAMS_ID); -// -// while( ids.hasNext() ) { -// Long currentId = ids.next(); -// LOGGER.info("Provider Task Starting: {}", currentId); -// captureTimeline(currentId); -// } -// -// LOGGER.info("{} Finished. Cleaning up...", STREAMS_ID); -// -// client.shutdown(); -// -// LOGGER.info("{} Exiting", STREAMS_ID); -// -// while(!providerTaskComplete.isDone() && !providerTaskComplete.isCancelled() ) { -// try { -// Thread.sleep(100); -// } catch (InterruptedException e) {} -// } -// } - @Override public void startStream() { - // no op + LOGGER.debug("{} startStream", STREAMS_ID); + throw new org.apache.commons.lang.NotImplementedException(); } - private void captureTimeline(long currentId) { + protected void captureTimeline(long currentId) { Paging paging = new Paging(1, 200); List<Status> statuses = null; @@@ -223,14 -251,14 +235,19 @@@ Preconditions.checkNotNull(config.getOauth().getConsumerSecret()); Preconditions.checkNotNull(config.getOauth().getAccessToken()); Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret()); - Preconditions.checkNotNull(config.getFollow()); - Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true); - Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true); - + idsCount = config.getFollow().size(); ids = config.getFollow().iterator(); + ++ jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true); ++ includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true); ++ + } + ++ + protected Twitter getTwitterClient() + { String baseUrl = "https://api.twitter.com:443/1.1/"; ConfigurationBuilder builder = new ConfigurationBuilder() @@@ -243,9 -271,11 +260,10 @@@ .setAsyncNumThreads(3) .setRestBaseURL(baseUrl) .setIncludeMyRetweetEnabled(Boolean.TRUE) + .setIncludeRTsEnabled(Boolean.TRUE) .setPrettyDebugEnabled(Boolean.TRUE); - client = new TwitterFactory(builder.build()).getInstance(); - + return new TwitterFactory(builder.build()).getInstance(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7293594e/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java ----------------------------------------------------------------------
