For scheduling, there is no built-in support but you have a simple script
that starts the application at a
predetermined time (using, for example, dtcli commands or the REST API),
then, when you are sure
that all data for the day has been processed and the application is idle,
you can shutdown the application
(again, using either dtcli or the REST APIs). I would suggest using a
scripting language like Ruby or
Python since they make many things easier than they shell.

Handling multiple directories is a little more involved: you'll need to
override the definePartition() method
of the AbstractFileInputOperator and possibly the DirectoryScanner as well.

Ram

On Tue, May 24, 2016 at 6:16 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
[email protected]> wrote:

> Hello,
>
>
>
> Thank you all for your valuable inputs. My use case is there will be 100
> feeds on HDFS in different locations , not from the same location and I
> have to read them using DT and load into Data base daily once , what is the
> best way to schedule the Data torrent batch job? And how would I achieve
> the parallel processing when my files are in different folders ?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Munagala Ramanath [mailto:[email protected]]
> *Sent:* 2016, May, 20 2:35 PM
> *To:* [email protected]
> *Subject:* Re: Information Needed
>
>
>
> It appears that SFTP support in the Hadoop file system will not be
> available till 2.8.0:
>
> https://issues.apache.org/jira/browse/HADOOP-5732
>
>
>
> So you might have to write your own SFTP operator or write to HDFS and use
> an
>
> external script to write to SFTP.
>
>
>
> Ram
>
>
>
> On Fri, May 20, 2016 at 11:21 AM, Devendra Tagare <
> [email protected]> wrote:
>
> Hi Surya,
>
>
>
> Good to know the DB reads are working as expected.
>
>
>
> Here's a list of operators you can use/refer for the next use-case,
>
>
>
> HDFS input - for reading multiple input files in parallel you can set
> partitionCount on the AbstractFileInputOperator for parallel
> reads.LineByLineFileInputOperator is a concrete implementation for reading
> one line at a time.
>
>
>
> xml parsing - there is a XmlParser in Malhar that takes in a xml string
> and emits a POJO.
>
>
>
> Combining multiple files into one  - could you please give us a sense of
> the volume and the frequency of writes you expect so we can recommend
> something appropriate ?
>
>
>
> SFTP push - need to check on this one.Will revert.
>
>
>
> @Community, please feel free to chip in.
>
>
>
> Thanks,
>
> Dev
>
>
>
>
>
> On Fri, May 20, 2016 at 8:54 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> [email protected]> wrote:
>
> Hi Devendra,
>
>
>
> Thank you , It is working now and I also could read the properties from
> xml file. I could also set the batch size and time gap for next database
> hit.
>
>
>
> Now , my another requirement is to read 50 different files from HDFS ,
> parse them using xml mapping and sftp as a single file to a UNIX box. Can
> you please suggest me the best practice like using parallel processing or
> partitioning?
>
>
>
> Do you have any sample code for parallel processing or partitioning and
> also how would I run the batch Job is there any batch scheduler that data
> torrent provides?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:[email protected]]
> *Sent:* 2016, May, 18 4:19 PM
>
>
> *To:* [email protected]
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> Can you try something like this,
>
>
>
>  JdbcPOJOInputOperator opr = dag.addOperator("JdbcPojo", new
> JdbcPOJOInputOperator());
>
>     JdbcStore store = new JdbcStore();
>
>     opr.setStore(store);
>
> The properties would then be set on this store object.
>
> From the code snippet provided earlier, the store was not being set on the
> JdbcInputOperator2
>
> Thanks,
>
> Dev
>
>
>
> On Wed, May 18, 2016 at 12:50 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> [email protected]> wrote:
>
> Hi ,
>
>
>
> Hello,
>
>
>
> When I have tried using below property as you suggested, my launch itself
> is failing. When I don’t use store and directly assign
> (‘dt.application.CountryNameScan.operator.<operatorName>.prop*.*databaseUrl’)
> launch is successful but application run is failing with null pointer
> exception since ‘store’ object is null.
>
>
>
> I see that in AbstractStoreInputOperator.java there is ‘store’ variable
> and I am not clear how the value is set to it.
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:[email protected]]
> *Sent:* 2016, May, 18 12:57 PM
>
>
> *To:* [email protected]
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> The property on the store is not getting set since ".store." qualifier is
> missing.Try the below for all store level properties.
>
>
>
>
>
> <property>
>
>     <name>dt.application.CountryNameScan.operator.<operatorName>.prop
> *.store.*databaseUrl</name>
>
>     <value>{databaseUrl}</value>
>
>   </property>
>
>
>
> Thanks,
>
> Dev
>
>
>
> On Wed, May 18, 2016 at 8:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> [email protected]> wrote:
>
> Hello,
>
>
>
> Thank you Shubam.
>
>
>
> I have tried using AbstractJdbcInputOperator. Below is the Operator and
> the error that I am getting. My observation is ‘*store’* and ‘*context’*
> objects are null. Please help to solve this issue.
>
>
>
> *Error Logs:*
>
>
>
> java.lang.NullPointerException
>
>                 at
> com.datatorrent.lib.db.AbstractStoreInputOperator.setup(AbstractStoreInputOperator.java:77)
>
>                 at
> com.rbc.aml.cnscan.operator.AbstractJdbcInputOperator.setup(AbstractJdbcInputOperator.java:99)
>
>                 at
> com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:29)
>
>                 at
> com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:13)
>
>                 at com.datatorrent.stram.engine.Node.setup(Node.java:161)
>
>                 at
> com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1287)
>
>                 at
> com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:92)
>
>                 at
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1361)
>
>
>
>
>
> *Operator Class:*
>
>
>
> import java.sql.ResultSet;
>
> import java.sql.SQLException;
>
>
>
> import com.datatorrent.api.Context;
>
> import com.datatorrent.api.Context.OperatorContext;
>
> import com.datatorrent.api.DefaultOutputPort;
>
> import java.sql.PreparedStatement;
>
> import com.datatorrent.api.Operator;
>
>
>
>
>
> public class JdbcInputOperator2 extends AbstractJdbcInputOperator<Object>
>
>                                 implements
> Operator.ActivationListener<Context.OperatorContext> {
>
>
>
>                 private transient PreparedStatement preparedStatement;
>
>
>
>                 private String query;
>
>
>
>                 // @OutputPortFieldAnnotation(schemaRequired = true)
>
>                 public final transient DefaultOutputPort<Object>
> outputPort = new DefaultOutputPort<Object>();
>
>
>
>                 public JdbcInputOperator2() {
>
>                                 super();
>
>                 }
>
>
>
>                 @Override
>
>                 public void setup(Context.OperatorContext context) {
>
>                                 super.setup(context);
>
>
>
>                                 try {
>
>                                                 preparedStatement =
> store.connection.prepareStatement(queryToRetrieveData());
>
>                                                 System.out.println("store
> value is"+store);
>
>                                 } catch (Exception e) {
>
>
>
>                                 }
>
>
>
>                 }
>
>
>
>                 @Override
>
>                 public Object getTuple(ResultSet result) {
>
>                                 // TODO Auto-generated method stub
>
>                                 StringBuilder sb = new StringBuilder();
>
>                                 try {
>
>                                                 System.out.println("result
> set"+result);
>
>                                                 while (result.next()) {
>
>
> sb.append(result.getString("CLNT_NO"));
>
>
> sb.append(",");
>
>
> sb.append(result.getString("TR_NO"));
>
>
> System.out.println("tuple value"+sb.toString());
>
>                                                 }
>
>                                 } catch (SQLException e) {
>
>                                                 // TODO Auto-generated
> catch block
>
>                                                 e.printStackTrace();
>
>                                 }
>
>
>
>                                 return sb.toString();
>
>                 }
>
>
>
>                 @Override
>
>                 public String queryToRetrieveData() {
>
>                                 // TODO Auto-generated method stub
>
>                                 return query;
>
>                 }
>
>
>
>                 @Override
>
>                 public void activate(OperatorContext arg0) {
>
>                                 // TODO Auto-generated method stub
>
>
>
>                 }
>
>
>
>                 @Override
>
>                 public void deactivate() {
>
>                                 // TODO Auto-generated method stub
>
>
>
>                 }
>
>
>
>                 public String getQuery() {
>
>                                 return query;
>
>                 }
>
>
>
>                 public void setQuery(String query) {
>
>                                 this.query = query;
>
>                 }
>
>
>
> }
>
>
>
> *Properties.xml*
>
>
>
> <property>
>
>
> <name>dt.application.CountryNameScan.operator.<operatorName>.prop.databaseUrl</name>
>
>     <value>{databaseUrl}</value>
>
>   </property>
>
>   <property>
>
>
> <name>dt.application.CountryNameScan.operator.<operatorName>.prop.databaseDriver</name>
>
>     <value>com.ibm.db2.jcc.DB2Driver</value>
>
>   </property>
>
>   <property>
>
>
> <name>dt.application.CountryNameScan.operator.<operatorName>.prop.connectionProperties</name>
>
>     <value>user:{uname},password:{psswrd}</value>
>
>   </property>
>
>   <property>
>
>
> <name>dt.application.CountryNameScan.operator.<operatorName>.prop.query</name>
>
>     <value> {Query}</value>
>
>   </property>
>
>
>
>
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Shubham Pathak [mailto:[email protected]]
> *Sent:* 2016, May, 18 1:51 AM
>
>
> *To:* [email protected]
> *Subject:* Re: Information Needed
>
>
>
> Hi Surya,
>
>
>
> If you are running into a Null Pointer exception in one of the operators
> after the application is launched ( application in Running state ), you may
> follow this link to view the logs
>
> http://docs.datatorrent.com/troubleshooting/#log-analysis
>
>
>
> However, if you are running into a Null Pointer exception before the app
> is launched, this would be because DAG is failing during verification step
> as some of the mandatory properties ( having @NotNull annotation ) might
> not have been initialized. For JdbcPOJOInputOperator, there are 3 such
> properties databaseUrl, databaseDriver and List<FieldInfo>. Could you
> verify if these have been set ?
>
>
>
> Thanks,
>
> Shubham
>
>
>
> On Wed, May 18, 2016 at 3:54 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> [email protected]> wrote:
>
> Hello Aswin/Ram,
>
>
>
> I am trying to use the JdbcPOJOInputOperator operators from Malhar JDBC,
> when I launch the application I am running into Null pointer exception. I
> am passing the DB connection details from properties XML.
>
>
>
> May I know where to look for the launch logs , I am running the DT
> application on my sandbox.
>
>
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Mukkamula, Suryavamshivardhan (CWM-NR) [mailto:
> [email protected]]
> *Sent:* 2016, May, 17 10:59 AM
> *To:* [email protected]
> *Subject:* RE: Information Needed
>
>
>
> Hi,
>
>
>
> Thank you for the inputs Ram. Understood how the properties are read.
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Munagala Ramanath [mailto:[email protected]
> <[email protected]>]
> *Sent:* 2016, May, 17 10:53 AM
> *To:* [email protected]
> *Subject:* Re: Information Needed
>
>
>
> Declare fields in your operators with appropriate setters and getters, for
> example:
>
>
>
> *private String foo;*
>
> *public void setFoo(String v) {foo = v;}*
>
> *public String getFoo(){ return foo;}*
>
>
>
> Then in the properties file, include a stanza like this:
>
>  * <property>*
>
> *    <name>dt.application.{appName}.operator.{opName}.prop.foo</name>*
>
> *    <value>Hello World</value>*
>
> *  </property>*
>
>
>
> where {appName} is the name of your application provided in the parameter
> of
>
> *@ApplicationAnnotation*, {opName} is the operator name given in the
> addOperator() call.
>
>
>
> The platform will automatically inject this value into your operator so
> that it is ready to use in
>
> all of the operator callbacks like setup(), beginWindow(), endWindow(),
> emitTuples(), etc.
>
>
>
> You can also use "*" wildcards for application and operator names. In such
> cases all operators
>
> that have the named field will have the value injected.
>
>
>
> In *[email protected]:datatorrent/examples
> <[email protected]:datatorrent/examples>* there are several sample
> applications. For example,
>
> the fileIO application has *src/main/resources/META-INF/properties.xml*
> which defines several
>
> properties for operators.
>
>
>
> You can also use annotations like @NotNull on your fields to ensure that
> values have been
>
> injected; if for some reason the value was not set (e.g. typo in the
> property name in the
>
> properties file), the application will fail validation and will not even
> launch.
>
>
>
> Ram
>
>
>
> On Tue, May 17, 2016 at 7:24 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> [email protected]> wrote:
>
> Hi Ram,
>
>
>
> How would I read the custom properties in my operators? , are these
> properties available to all the operators which run on different nodes of
> the cluster?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Munagala Ramanath [mailto:[email protected]]
> *Sent:* 2016, May, 17 9:58 AM
> *To:* [email protected]
> *Subject:* Re: Information Needed
>
>
>
> You can add multiple XML configuration files at *src/site/conf* in your
> project and select one of them at launch time.
>
> This is described briefly at
> http://docs.datatorrent.com/application_packages/ under the section
> entitled
>
> *Adding pre-set configurations*
>
>
>
> Ram
>
>
>
> On Tue, May 17, 2016 at 6:43 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> [email protected]> wrote:
>
> Hi Ashwin,
>
>
>
> Thank you.
>
>
>
> I see that result set is read once and tuples are emitted tuple by tuple.
> I will explore these JDBC operators.
>
>
>
> One more requirement, I would like to read custom properties-JDBC and set
> my dbconnection and query in the properties itself, do you have sample code
> to read custom properties from the class path.
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Ashwin Chandra Putta [mailto:[email protected]]
> *Sent:* 2016, May, 16 5:21 PM
> *To:* [email protected]
> *Subject:* Re: Information Needed
>
>
>
> Surya,
>
>
>
> You can use the jdbc input and output operators to fetch from origin
> database and destination database.
>
>
>
> Check
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/db/jdbc
>
>
>
> They should have configurable batch sizes. In general batching is faster
> than tuple by tuple.
>
>
>
> Regards,
>
> Ashwin.
>
>
>
> On Mon, May 16, 2016 at 1:20 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> [email protected]> wrote:
>
> Hi Ashwin,
>
>
>
> It is to get all the 10k rows as a single batch and process them and
> insert into destination database.
>
>
>
> Which one is suggested ,  to get single row from database or batch of 10k
> rows and process and then insert into destination database?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Ashwin Chandra Putta [mailto:[email protected]]
> *Sent:* 2016, May, 16 1:50 PM
> *To:* [email protected]
> *Subject:* Re: Information Needed
>
>
>
> Surya,
>
>
>
> By single row tuple, are you looking for a way to enrich/join your tuples
> with a single tuple from another table? Or simply get all rows from the
> origin database table as a single batch of 10000 tuples?
>
>
>
> Regards,
>
> Ashwin.
>
>
>
> On Mon, May 16, 2016 at 8:54 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> [email protected]> wrote:
>
> Hello Team,
>
>
>
> Use Case : Batch ingestion of transaction data (rows = 10000) from DB2 to
> Vertica database.
>
>
>
> Question : How Can I make DB2 Connection in such a way that I get a single
> row tuple only ? Do you have any sample code which reads from one database
> and writes into another database ?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
>
>
> --
>
>
>
> Regards,
>
> Ashwin.
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
>
>
> --
>
>
>
> Regards,
>
> Ashwin.
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>

Reply via email to