Raman Grover has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/354

Change subject: Fix for Issue 929
......................................................................

Fix for Issue 929

commit 58317b37a3a7b2546b4780f4427ae1ed21a4ece9
Author: Ubuntu <[email protected]>
Date:   Fri Aug 14 10:47:06 2015 +0000

    Fix for Issue 929:
      a) Added documenation for use of OAuth keys and tokens when using the 
built-in Twitter adaptor
      b) Modified RSS feed adaptor and added documenation

Change-Id: I5521287a4fa1818c78a4f83b1a3cabeea8e6096d
---
M 
asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
M asterix-doc/src/site/markdown/feeds/tutorial.md
M 
asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
M 
asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
M 
asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
M 
asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
6 files changed, 155 insertions(+), 16 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/54/354/1

diff --git 
a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
 
b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index dad5975..08af842 100644
--- 
a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ 
b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -187,6 +187,7 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -2032,6 +2033,7 @@
 
             FeedId feedId = new FeedId(dataverseName, feedName);
             List<FeedConnectionId> activeConnections = 
FeedLifecycleListener.INSTANCE.getActiveFeedConnections(feedId);
+            System.out.println("numnber of active connections " + 
activeConnections.size());
             if (activeConnections != null && !activeConnections.isEmpty()) {
                 StringBuilder builder = new StringBuilder();
                 for (FeedConnectionId connectionId : activeConnections) {
@@ -2041,6 +2043,7 @@
                 throw new AlgebricksException("Feed " + feedId
                         + " is currently active and connected to the following 
dataset(s) \n" + builder.toString());
             } else {
+                System.out.println("Attempt to drop feed");
                 MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, 
feedName);
             }
 
@@ -2129,10 +2132,10 @@
             // All Metadata checks have passed. Feed connect request is valid. 
//
 
             FeedPolicyAccessor policyAccessor = new 
FeedPolicyAccessor(feedPolicy.getProperties());
-            Pair<FeedConnectionRequest, Boolean> p = 
getFeedConnectionRequest(dataverseName, feed,
+            Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple = 
getFeedConnectionRequest(dataverseName, feed,
                     cbfs.getDatasetName(), feedPolicy, mdTxnCtx);
-            FeedConnectionRequest connectionRequest = p.first;
-            boolean createFeedIntakeJob = p.second;
+            FeedConnectionRequest connectionRequest = triple.first;
+            boolean createFeedIntakeJob = triple.second;
 
             
FeedLifecycleListener.INSTANCE.registerFeedEventSubscriber(feedConnId, 
eventSubscriber);
             subscriberRegistered = true;
@@ -2142,6 +2145,11 @@
                         feedId.getDataverse(), feedId.getFeedName());
                 Pair<JobSpecification, IFeedAdapterFactory> pair = 
FeedOperations.buildFeedIntakeJobSpec(primaryFeed,
                         metadataProvider, policyAccessor);
+                // adapter configuration are valid at this stage
+                // register the feed joints (these are auto-de-registered)
+                for (IFeedJoint fj : triple.third){
+                    FeedLifecycleListener.INSTANCE.registerFeedJoint(fj);   
+                }
                 runJob(hcc, pair.first, false);
                 IFeedAdapterFactory adapterFactory = pair.second;
                 if (adapterFactory.isRecordTrackingEnabled()) {
@@ -2149,6 +2157,10 @@
                             adapterFactory.createIntakeProgressTracker());
                 }
                 
eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED);
+            } else {
+                for (IFeedJoint fj : triple.third){
+                    FeedLifecycleListener.INSTANCE.registerFeedJoint(fj);   
+                }
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -2193,7 +2205,7 @@
      * @return
      * @throws MetadataException
      */
-    private Pair<FeedConnectionRequest, Boolean> 
getFeedConnectionRequest(String dataverse, Feed feed, String dataset,
+    private Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> 
getFeedConnectionRequest(String dataverse, Feed feed, String dataset,
             FeedPolicy feedPolicy, MetadataTransactionContext mdTxnCtx) throws 
MetadataException {
         IFeedJoint sourceFeedJoint = null;
         FeedConnectionRequest request = null;
@@ -2235,9 +2247,6 @@
                         ConnectionLocation.SOURCE_FEED_COMPUTE_STAGE, 
FeedJointType.COMPUTE, connectionId);
                 jointsToRegister.add(computeFeedJoint);
             }
-            for (IFeedJoint joint : jointsToRegister) {
-                FeedLifecycleListener.INSTANCE.registerFeedJoint(joint);
-            }
         } else {
             sourceFeedJoint = 
FeedLifecycleListener.INSTANCE.getFeedJoint(feedJointKey);
             connectionLocation = sourceFeedJoint.getConnectionLocation();
@@ -2250,7 +2259,7 @@
                 dataset, feedPolicy.getPolicyName(), 
feedPolicy.getProperties(), feed.getFeedId());
 
         sourceFeedJoint.addConnectionRequest(request);
-        return new Pair<FeedConnectionRequest, Boolean>(request, 
needIntakeJob);
+        return new Triple<FeedConnectionRequest, Boolean, 
List<IFeedJoint>>(request, needIntakeJob, jointsToRegister);
     }
     
     /*
diff --git a/asterix-doc/src/site/markdown/feeds/tutorial.md 
b/asterix-doc/src/site/markdown/feeds/tutorial.md
index 886d29d..047e482 100644
--- a/asterix-doc/src/site/markdown/feeds/tutorial.md
+++ b/asterix-doc/src/site/markdown/feeds/tutorial.md
@@ -6,7 +6,7 @@
   * [Data Feed Basics](#DataFeedBasics)
     * [Collecting Data: Feed Adaptors](#FeedAdaptors)
     * [Preprocessing Collected Data](#PreprocessingCollectedData)
-  * [Creating an External Dataset](#IntroductionCreatingAnExternalDataset)
+  * [Creating an External UDF](#IntroductionCreatingAnExternalUDF)
 
 ## <a id="Introduction">Introduction</a> <font size="4"><a href="#toc">[Back 
to TOC]</a></font> ##
 In this document, we describe the support for data ingestion in AsterixDB, an 
open-source Big Data Management System (BDMS) that provides a platform for 
storage and analysis of large volumes of semi-structured data. Data feeds are a 
new mechanism for having
@@ -62,7 +62,7 @@
 Next we make use of the create feed AQL statement to define our example data 
feed. 
 
         create feed TwitterFeed if not exists using "push_twitter"
-        (("type-name"="Tweet"),("location"="US"));
+        (("type-name"="Tweet"));
 
 Note that the create feed statement does not initiate the flow of data from 
Twitter into our AsterixDB instance. Instead, the create feed statement only 
results in registering the feed with AsterixDB. The flow of data along a feed 
is initiated when it is connected
 to a target dataset using the connect feed statement (which we shall revisit 
later).
@@ -106,7 +106,7 @@
 
         create feed ProcessedTwitterFeed if not exists
         using "push_twitter"
-        (("type-name"="Tweet"),("location"="US"));
+        (("type-name"="Tweet"));
         apply function testlib#processRawTweet;
 
 Note that a feed adaptor and a UDF act as pluggable components. These
@@ -216,7 +216,7 @@
 excess.records.throttle  | Set to true if rate of arrival of records is 
required to be reduced in an adaptive manner to prevent having any excess 
records.                                                               | false  
       |
 excess.records.elastic   | Set to true if the system should attempt to resolve 
resource bottlenecks by re-structuring and/or rescheduling the feed ingestion 
pipeline.                                                   | false         |
 recover.soft.failure     | Set to true if the feed must attempt to survive any 
runtime exception. A false value permits an early termination of a feed in such 
an event.                                                 | true          |
-recover.hard.failure     | Set to true if the feed must attempt to survive a 
hardware failures (loss of AsterixDB node(s)). A false value permits the early 
termination of a feed in the event of a hardware failure     |               |
+recover.hard.failure     | Set to true if the feed must attempt to survive a 
hardware failures (loss of AsterixDB node(s)). A false value permits the early 
termination of a feed in the event of a hardware failure     | false         |
 
 Note that the end user may choose to form a custom policy. E.g.
 it is possible in AsterixDB to create a custom policy that spills excess
@@ -284,5 +284,88 @@
         functionHelper.setResult(result);
     }
 
+####Installing an External Library####
 
+'''Creating an AsterixDB Library[[BR]]'''[[BR]]We need to install our Java UDF 
so that we may use it in AQL statements/queries.[[BR]] An AsterixDB library has 
a pre-defined structure which is as follows.
 
+'''--jar''' '''file''' containing all class files. [[BR]]This is the jar file 
that would contain the class files for your UDF source code. In the case of our 
application, it will include the class files for the function and associated 
factory.
+
+--'''library descriptor.xml[[BR]]'''  This is a descriptor that provide 
meta-information about the library.
+
+{{{
+<externalLibrary xmlns="library">
+       <language>JAVA</language>
+       <libraryFunctions>
+               <libraryFunction>
+                       <function_type>SCALAR</function_type>
+                       <name>hashTags</name>
+                       <arguments>Tweet</arguments>
+                       <return_type>ProcessedTweet</return_type>
+                       
<definition>edu.uci.ics.asterix.external.udf.HashTagsFunctionFactory
+                       </definition>
+               </libraryFunction>
+       </libraryFunctions>
+</externalLibrary>
+
+}}}
+--'''lib/<other dependency jars>'''
+
+If the Java UDF requires additional dependency jars, you may add them under a 
"lib" folder is required. The UDF in our application does not have any 
dependency jars and so we need not have the lib directory in our library bundle.
+
+We create a zip bundle that contains the jar file and the library descriptor 
xml file. The zip would have the following structure.
+
+{{{
+$ unzip -l ./tweetlib.zip 
+Archive:  ./tweetlib.zip
+  Length     Date   Time    Name
+ --------    ----   ----    ----
+   760817  04-23-14 17:16   hash-tags.jar
+      405  04-23-14 17:16   tweet.xml
+ --------                   -------
+   761222                   2 files
+}}}
+'''Installing an AsterixDB Library''' We assume you have followed the 
[http://asterixdb.ics.uci.edu/documentation/install.html instructions] to set 
up a running AsterixDB instance. Let us refer your AsterixDB instance by the 
name "my_asterix".
+
+'''''Step 1'':''' Stop the AsterixDB instance if it is in the ACTIVE state.
+
+{{{
+$ managix stop  -n my_asterix
+}}}
+'''''Step 2'':''' Install the library using Managix install command. Just to 
illustrate, we use the help command to look up the syntax
+
+{{{
+$ managix help  -cmd install
+Installs a library to an asterix instance.
+Arguments/Options
+-n  Name of Asterix Instance
+-d  Name of the dataverse under which the library will be installed
+-l  Name of the library
+-p  Path to library zip bundle
+}}}
+Above is a sample output and explains the usage and the required parameters. 
Each library has a name and is installed under a dataverse. Recall that we had 
created a dataverse by the name - "feeds" prior to  creating our datatypes and 
dataset. We shall name our library - "tweetlib", but ofcourse, you may choose 
another name.
+
+You may download the pre-packaged library [attachment:tweetlib.zip here] and 
place the downloaded library (a zip bundle) at a convenient location on your 
disk. To install the library, use the Managix install command. An example is 
shown below.
+
+{{{
+$ managix install -n my_asterix -d feeds -l tweetlib -p <put the absolute path 
of the library zip bundle here> 
+}}}
+You should see the following message:
+
+{{{
+INFO: Installed library tweetlib
+}}}
+We shall next start our AsterixDB instance using the start command as shown 
below.
+
+{{{
+$ managix start -n my_asterix
+}}}
+You may now use the AsterixDB library in AQL statements and queries. To look 
at the installed artifacts, you may execute the following query at the 
AsterixDB web-console.
+
+{{{
+for $x in dataset Metadata.Function 
+return $x
+
+for $x in dataset Metadata.Library
+return $x
+}}}
+Our library is now installed and is ready to be used.  So far we have done the 
following. [[BR]]a) Created a dataverse and defined the required datatypes 
[[BR]]b) Created a dataset to persist the ingested tweets[[BR]]c) Created a 
Java UDF that would provide the pre-processing logic [[BR]]d) Packaged the Java 
UDF into an AsterixDB library and installed the library
diff --git 
a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
 
b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
index 8af49fb..f77efbf 100644
--- 
a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
+++ 
b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
@@ -20,6 +20,7 @@
 import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
 import edu.uci.ics.asterix.external.dataset.adapter.PushBasedTwitterAdapter;
 import edu.uci.ics.asterix.external.util.TwitterUtil;
+import edu.uci.ics.asterix.external.util.TwitterUtil.AuthenticationConstants;
 import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import 
edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
@@ -62,6 +63,16 @@
         this.outputType = outputType;
         this.configuration = configuration;
         TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
+        boolean requiredParamsSpecified = validateConfiguration(configuration);
+        if(!requiredParamsSpecified){
+           StringBuilder builder = new StringBuilder();
+           builder.append("One or more parameters are missing from adapter 
configuration\n");
+           builder.append(AuthenticationConstants.OAUTH_CONSUMER_KEY + "\n");
+           builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + 
"\n");
+           builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
+           builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + 
"\n");
+           throw new Exception(builder.toString());
+        }
     }
 
     @Override
@@ -80,4 +91,16 @@
         return null;
     }
 
+    private boolean validateConfiguration(Map<String, String> configuration) {
+        String consumerKey  = 
configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
+        String consumerSecret  = 
configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
+        String accessToken  = 
configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
+        String tokenSecret  = 
configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
+        
+        if(consumerKey == null  || consumerSecret == null || accessToken == 
null || tokenSecret == null){
+            return false;
+        }
+        return true;
+    }
+
 }
diff --git 
a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
 
b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 4ab7d22..baab239 100644
--- 
a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ 
b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -32,7 +32,7 @@
 
     private static final long serialVersionUID = 1L;
 
-    private static final String KEY_RSS_URL = "rss_url";
+    private static final String KEY_RSS_URL = "url";
 
     private List<String> feedURLs = new ArrayList<String>();
     private String id_prefix = "";
@@ -46,6 +46,7 @@
         super(configuration, ctx);
         id_prefix = ctx.getJobletContext().getApplicationContext().getNodeId();
         this.recordType = recordType;
+        reconfigure(configuration);
     }
 
     private void initializeFeedURLs(String rssURLProperty) {
diff --git 
a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
 
b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
index 8a74963..2737582 100644
--- 
a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
+++ 
b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
@@ -18,6 +18,8 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.logging.Logger;
+import java.util.logging.Level;
 
 import twitter4j.FilterQuery;
 import twitter4j.Twitter;
@@ -28,6 +30,9 @@
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 
 public class TwitterUtil {
+
+
+    private static Logger LOGGER = 
Logger.getLogger(TwitterUtil.class.getName());
 
     public static class ConfigurationConstants {
         public static final String KEY_LOCATION = "location";
@@ -77,7 +82,22 @@
 
     public static Twitter getTwitterService(Map<String, String> configuration) 
{
         ConfigurationBuilder cb = getAuthConfiguration(configuration);
-        TwitterFactory tf = new TwitterFactory(cb.build());
+        TwitterFactory tf = null;
+        try{
+          tf = new TwitterFactory(cb.build());
+        } catch (Exception e){
+         if (LOGGER.isLoggable(Level.WARNING)){
+            StringBuilder builder = new StringBuilder();
+            builder.append("Twitter Adapter requires the following config 
parameters\n");
+            builder.append(AuthenticationConstants.OAUTH_CONSUMER_KEY + "\n");
+            builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + 
"\n");
+            builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
+            builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + 
"\n");
+            LOGGER.warning(builder.toString()); 
+            LOGGER.warning("Unable to configure Twitter adapter due to 
incomplete/incorrect authentication credentials");
+            LOGGER.warning("For details on how to obtain OAuth authentication 
token, visit 
https://dev.twitter.com/oauth/overview/application-owner-access-tokens";);
+         }  
+        }
         Twitter twitter = tf.getInstance();
         return twitter;
     }
@@ -132,8 +152,10 @@
                     break;
             }
         } catch (Exception e) {
-            throw new AsterixException("Incorrect configuration! unable to 
load authentication credentials "
-                    + e.getMessage());
+            if(LOGGER.isLoggable(Level.WARNING)){
+                LOGGER.warning("unable to load authentication credentials from 
auth.properties file" + 
+             "credential information will be obtained from adapter's 
configuration");
+            }
         }
     }
 
diff --git 
a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
 
b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 97c90f8..62688de 100644
--- 
a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ 
b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -323,6 +323,7 @@
                 
"edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory",
                 
"edu.uci.ics.asterix.external.adapter.factory.HiveAdapterFactory",
                 
"edu.uci.ics.asterix.external.adapter.factory.PullBasedTwitterAdapterFactory",
+                
"edu.uci.ics.asterix.external.adapter.factory.PushBasedTwitterAdapterFactory",
                 
"edu.uci.ics.asterix.external.adapter.factory.RSSFeedAdapterFactory",
                 
"edu.uci.ics.asterix.external.adapter.factory.CNNFeedAdapterFactory",
                 
"edu.uci.ics.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory",

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/354
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I5521287a4fa1818c78a4f83b1a3cabeea8e6096d
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Raman Grover <[email protected]>

Reply via email to