Hi, AbstractFileOutputOperator can be used for FTP push. Need to confirm if it works with sftp as well.
-Priyanka On Fri, May 20, 2016 at 11:21 AM, Devendra Tagare <devend...@datatorrent.com > wrote: > Hi Surya, > > Good to know the DB reads are working as expected. > > Here's a list of operators you can use/refer for the next use-case, > > HDFS input - for reading multiple input files in parallel you can set > partitionCount on the AbstractFileInputOperator for parallel > reads.LineByLineFileInputOperator is a concrete implementation for reading > one line at a time. > > xml parsing - there is a XmlParser in Malhar that takes in a xml string > and emits a POJO. > > Combining multiple files into one - could you please give us a sense of > the volume and the frequency of writes you expect so we can recommend > something appropriate ? > > SFTP push - need to check on this one.Will revert. > > @Community, please feel free to chip in. > > Thanks, > Dev > > > On Fri, May 20, 2016 at 8:54 AM, Mukkamula, Suryavamshivardhan (CWM-NR) < > suryavamshivardhan.mukkam...@rbc.com> wrote: > >> Hi Devendra, >> >> >> >> Thank you , It is working now and I also could read the properties from >> xml file. I could also set the batch size and time gap for next database >> hit. >> >> >> >> Now , my another requirement is to read 50 different files from HDFS , >> parse them using xml mapping and sftp as a single file to a UNIX box. Can >> you please suggest me the best practice like using parallel processing or >> partitioning? >> >> >> >> Do you have any sample code for parallel processing or partitioning and >> also how would I run the batch Job is there any batch scheduler that data >> torrent provides? >> >> >> >> Regards, >> >> Surya Vamshi >> >> >> >> *From:* Devendra Tagare [mailto:devend...@datatorrent.com] >> *Sent:* 2016, May, 18 4:19 PM >> >> *To:* users@apex.incubator.apache.org >> *Subject:* Re: Information Needed >> >> >> >> Hi, >> >> >> >> Can you try something like this, >> >> >> >> JdbcPOJOInputOperator opr = dag.addOperator("JdbcPojo", new >> JdbcPOJOInputOperator()); >> >> JdbcStore store = new JdbcStore(); >> >> opr.setStore(store); >> >> The properties would then be set on this store object. >> >> From the code snippet provided earlier, the store was not being set on >> the JdbcInputOperator2 >> >> Thanks, >> >> Dev >> >> >> >> On Wed, May 18, 2016 at 12:50 PM, Mukkamula, Suryavamshivardhan (CWM-NR) < >> suryavamshivardhan.mukkam...@rbc.com> wrote: >> >> Hi , >> >> >> >> Hello, >> >> >> >> When I have tried using below property as you suggested, my launch itself >> is failing. When I don’t use store and directly assign >> (‘dt.application.CountryNameScan.operator.<operatorName>.prop*.*databaseUrl’) >> launch is successful but application run is failing with null pointer >> exception since ‘store’ object is null. >> >> >> >> I see that in AbstractStoreInputOperator.java there is ‘store’ variable >> and I am not clear how the value is set to it. >> >> >> >> Regards, >> >> Surya Vamshi >> >> >> >> *From:* Devendra Tagare [mailto:devend...@datatorrent.com] >> *Sent:* 2016, May, 18 12:57 PM >> >> >> *To:* users@apex.incubator.apache.org >> *Subject:* Re: Information Needed >> >> >> >> Hi, >> >> >> >> The property on the store is not getting set since ".store." qualifier is >> missing.Try the below for all store level properties. >> >> >> >> >> >> <property> >> >> <name>dt.application.CountryNameScan.operator.<operatorName>.prop >> *.store.*databaseUrl</name> >> >> <value>{databaseUrl}</value> >> >> </property> >> >> >> >> Thanks, >> >> Dev >> >> >> >> On Wed, May 18, 2016 at 8:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) < >> suryavamshivardhan.mukkam...@rbc.com> wrote: >> >> Hello, >> >> >> >> Thank you Shubam. >> >> >> >> I have tried using AbstractJdbcInputOperator. Below is the Operator and >> the error that I am getting. My observation is ‘*store’* and ‘*context’* >> objects are null. Please help to solve this issue. >> >> >> >> *Error Logs:* >> >> >> >> java.lang.NullPointerException >> >> at >> com.datatorrent.lib.db.AbstractStoreInputOperator.setup(AbstractStoreInputOperator.java:77) >> >> at >> com.rbc.aml.cnscan.operator.AbstractJdbcInputOperator.setup(AbstractJdbcInputOperator.java:99) >> >> at >> com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:29) >> >> at >> com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:13) >> >> at com.datatorrent.stram.engine.Node.setup(Node.java:161) >> >> at >> com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1287) >> >> at >> com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:92) >> >> at >> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1361) >> >> >> >> >> >> *Operator Class:* >> >> >> >> import java.sql.ResultSet; >> >> import java.sql.SQLException; >> >> >> >> import com.datatorrent.api.Context; >> >> import com.datatorrent.api.Context.OperatorContext; >> >> import com.datatorrent.api.DefaultOutputPort; >> >> import java.sql.PreparedStatement; >> >> import com.datatorrent.api.Operator; >> >> >> >> >> >> public class JdbcInputOperator2 extends AbstractJdbcInputOperator<Object> >> >> implements >> Operator.ActivationListener<Context.OperatorContext> { >> >> >> >> private transient PreparedStatement preparedStatement; >> >> >> >> private String query; >> >> >> >> // @OutputPortFieldAnnotation(schemaRequired = true) >> >> public final transient DefaultOutputPort<Object> >> outputPort = new DefaultOutputPort<Object>(); >> >> >> >> public JdbcInputOperator2() { >> >> super(); >> >> } >> >> >> >> @Override >> >> public void setup(Context.OperatorContext context) { >> >> super.setup(context); >> >> >> >> try { >> >> preparedStatement = >> store.connection.prepareStatement(queryToRetrieveData()); >> >> System.out.println("store >> value is"+store); >> >> } catch (Exception e) { >> >> >> >> } >> >> >> >> } >> >> >> >> @Override >> >> public Object getTuple(ResultSet result) { >> >> // TODO Auto-generated method stub >> >> StringBuilder sb = new StringBuilder(); >> >> try { >> >> >> System.out.println("result set"+result); >> >> while (result.next()) { >> >> >> sb.append(result.getString("CLNT_NO")); >> >> >> sb.append(","); >> >> >> sb.append(result.getString("TR_NO")); >> >> >> System.out.println("tuple value"+sb.toString()); >> >> } >> >> } catch (SQLException e) { >> >> // TODO Auto-generated >> catch block >> >> e.printStackTrace(); >> >> } >> >> >> >> return sb.toString(); >> >> } >> >> >> >> @Override >> >> public String queryToRetrieveData() { >> >> // TODO Auto-generated method stub >> >> return query; >> >> } >> >> >> >> @Override >> >> public void activate(OperatorContext arg0) { >> >> // TODO Auto-generated method stub >> >> >> >> } >> >> >> >> @Override >> >> public void deactivate() { >> >> // TODO Auto-generated method stub >> >> >> >> } >> >> >> >> public String getQuery() { >> >> return query; >> >> } >> >> >> >> public void setQuery(String query) { >> >> this.query = query; >> >> } >> >> >> >> } >> >> >> >> *Properties.xml* >> >> >> >> <property> >> >> >> <name>dt.application.CountryNameScan.operator.<operatorName>.prop.databaseUrl</name> >> >> <value>{databaseUrl}</value> >> >> </property> >> >> <property> >> >> >> <name>dt.application.CountryNameScan.operator.<operatorName>.prop.databaseDriver</name> >> >> <value>com.ibm.db2.jcc.DB2Driver</value> >> >> </property> >> >> <property> >> >> >> <name>dt.application.CountryNameScan.operator.<operatorName>.prop.connectionProperties</name> >> >> <value>user:{uname},password:{psswrd}</value> >> >> </property> >> >> <property> >> >> >> <name>dt.application.CountryNameScan.operator.<operatorName>.prop.query</name> >> >> <value> {Query}</value> >> >> </property> >> >> >> >> >> >> >> >> Regards, >> >> Surya Vamshi >> >> >> >> *From:* Shubham Pathak [mailto:shub...@datatorrent.com] >> *Sent:* 2016, May, 18 1:51 AM >> >> >> *To:* users@apex.incubator.apache.org >> *Subject:* Re: Information Needed >> >> >> >> Hi Surya, >> >> >> >> If you are running into a Null Pointer exception in one of the operators >> after the application is launched ( application in Running state ), you may >> follow this link to view the logs >> >> http://docs.datatorrent.com/troubleshooting/#log-analysis >> >> >> >> However, if you are running into a Null Pointer exception before the app >> is launched, this would be because DAG is failing during verification step >> as some of the mandatory properties ( having @NotNull annotation ) might >> not have been initialized. For JdbcPOJOInputOperator, there are 3 such >> properties databaseUrl, databaseDriver and List<FieldInfo>. Could you >> verify if these have been set ? >> >> >> >> Thanks, >> >> Shubham >> >> >> >> On Wed, May 18, 2016 at 3:54 AM, Mukkamula, Suryavamshivardhan (CWM-NR) < >> suryavamshivardhan.mukkam...@rbc.com> wrote: >> >> Hello Aswin/Ram, >> >> >> >> I am trying to use the JdbcPOJOInputOperator operators from Malhar JDBC, >> when I launch the application I am running into Null pointer exception. I >> am passing the DB connection details from properties XML. >> >> >> >> May I know where to look for the launch logs , I am running the DT >> application on my sandbox. >> >> >> >> >> >> Regards, >> >> Surya Vamshi >> >> >> >> *From:* Mukkamula, Suryavamshivardhan (CWM-NR) [mailto: >> suryavamshivardhan.mukkam...@rbc.com] >> *Sent:* 2016, May, 17 10:59 AM >> *To:* users@apex.incubator.apache.org >> *Subject:* RE: Information Needed >> >> >> >> Hi, >> >> >> >> Thank you for the inputs Ram. Understood how the properties are read. >> >> >> >> Regards, >> >> Surya Vamshi >> >> >> >> *From:* Munagala Ramanath [mailto:r...@datatorrent.com >> <r...@datatorrent.com>] >> *Sent:* 2016, May, 17 10:53 AM >> *To:* users@apex.incubator.apache.org >> *Subject:* Re: Information Needed >> >> >> >> Declare fields in your operators with appropriate setters and getters, >> for example: >> >> >> >> *private String foo;* >> >> *public void setFoo(String v) {foo = v;}* >> >> *public String getFoo(){ return foo;}* >> >> >> >> Then in the properties file, include a stanza like this: >> >> * <property>* >> >> * <name>dt.application.{appName}.operator.{opName}.prop.foo</name>* >> >> * <value>Hello World</value>* >> >> * </property>* >> >> >> >> where {appName} is the name of your application provided in the parameter >> of >> >> *@ApplicationAnnotation*, {opName} is the operator name given in the >> addOperator() call. >> >> >> >> The platform will automatically inject this value into your operator so >> that it is ready to use in >> >> all of the operator callbacks like setup(), beginWindow(), endWindow(), >> emitTuples(), etc. >> >> >> >> You can also use "*" wildcards for application and operator names. In >> such cases all operators >> >> that have the named field will have the value injected. >> >> >> >> In *g...@github.com:datatorrent/examples >> <g...@github.com:datatorrent/examples>* there are several sample >> applications. For example, >> >> the fileIO application has *src/main/resources/META-INF/properties.xml* >> which defines several >> >> properties for operators. >> >> >> >> You can also use annotations like @NotNull on your fields to ensure that >> values have been >> >> injected; if for some reason the value was not set (e.g. typo in the >> property name in the >> >> properties file), the application will fail validation and will not even >> launch. >> >> >> >> Ram >> >> >> >> On Tue, May 17, 2016 at 7:24 AM, Mukkamula, Suryavamshivardhan (CWM-NR) < >> suryavamshivardhan.mukkam...@rbc.com> wrote: >> >> Hi Ram, >> >> >> >> How would I read the custom properties in my operators? , are these >> properties available to all the operators which run on different nodes of >> the cluster? >> >> >> >> Regards, >> >> Surya Vamshi >> >> >> >> *From:* Munagala Ramanath [mailto:r...@datatorrent.com] >> *Sent:* 2016, May, 17 9:58 AM >> *To:* users@apex.incubator.apache.org >> *Subject:* Re: Information Needed >> >> >> >> You can add multiple XML configuration files at *src/site/conf* in your >> project and select one of them at launch time. >> >> This is described briefly at >> http://docs.datatorrent.com/application_packages/ under the section >> entitled >> >> *Adding pre-set configurations* >> >> >> >> Ram >> >> >> >> On Tue, May 17, 2016 at 6:43 AM, Mukkamula, Suryavamshivardhan (CWM-NR) < >> suryavamshivardhan.mukkam...@rbc.com> wrote: >> >> Hi Ashwin, >> >> >> >> Thank you. >> >> >> >> I see that result set is read once and tuples are emitted tuple by tuple. >> I will explore these JDBC operators. >> >> >> >> One more requirement, I would like to read custom properties-JDBC and set >> my dbconnection and query in the properties itself, do you have sample code >> to read custom properties from the class path. >> >> >> >> Regards, >> >> Surya Vamshi >> >> >> >> *From:* Ashwin Chandra Putta [mailto:ashwinchand...@gmail.com] >> *Sent:* 2016, May, 16 5:21 PM >> *To:* users@apex.incubator.apache.org >> *Subject:* Re: Information Needed >> >> >> >> Surya, >> >> >> >> You can use the jdbc input and output operators to fetch from origin >> database and destination database. >> >> >> >> Check >> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/db/jdbc >> >> >> >> They should have configurable batch sizes. In general batching is faster >> than tuple by tuple. >> >> >> >> Regards, >> >> Ashwin. >> >> >> >> On Mon, May 16, 2016 at 1:20 PM, Mukkamula, Suryavamshivardhan (CWM-NR) < >> suryavamshivardhan.mukkam...@rbc.com> wrote: >> >> Hi Ashwin, >> >> >> >> It is to get all the 10k rows as a single batch and process them and >> insert into destination database. >> >> >> >> Which one is suggested , to get single row from database or batch of 10k >> rows and process and then insert into destination database? >> >> >> >> Regards, >> >> Surya Vamshi >> >> >> >> *From:* Ashwin Chandra Putta [mailto:ashwinchand...@gmail.com] >> *Sent:* 2016, May, 16 1:50 PM >> *To:* users@apex.incubator.apache.org >> *Subject:* Re: Information Needed >> >> >> >> Surya, >> >> >> >> By single row tuple, are you looking for a way to enrich/join your tuples >> with a single tuple from another table? Or simply get all rows from the >> origin database table as a single batch of 10000 tuples? >> >> >> >> Regards, >> >> Ashwin. >> >> >> >> On Mon, May 16, 2016 at 8:54 AM, Mukkamula, Suryavamshivardhan (CWM-NR) < >> suryavamshivardhan.mukkam...@rbc.com> wrote: >> >> Hello Team, >> >> >> >> Use Case : Batch ingestion of transaction data (rows = 10000) from DB2 to >> Vertica database. >> >> >> >> Question : How Can I make DB2 Connection in such a way that I get a >> single row tuple only ? Do you have any sample code which reads from one >> database and writes into another database ? >> >> >> >> Regards, >> >> Surya Vamshi >> >> >> >> _______________________________________________________________________ >> >> This [email] may be privileged and/or confidential, and the sender does >> not waive any related rights and obligations. Any distribution, use or >> copying of this [email] or the information it contains by other than an >> intended recipient is unauthorized. If you received this [email] in error, >> please advise the sender (by return [email] or otherwise) immediately. You >> have consented to receive the attached electronically at the above-noted >> address; please retain a copy of this confirmation for future reference. >> >> >> >> >> >> -- >> >> >> >> Regards, >> >> Ashwin. >> >> _______________________________________________________________________ >> >> This [email] may be privileged and/or confidential, and the sender does >> not waive any related rights and obligations. Any distribution, use or >> copying of this [email] or the information it contains by other than an >> intended recipient is unauthorized. If you received this [email] in error, >> please advise the sender (by return [email] or otherwise) immediately. You >> have consented to receive the attached electronically at the above-noted >> address; please retain a copy of this confirmation for future reference. >> >> >> >> >> >> -- >> >> >> >> Regards, >> >> Ashwin. >> >> _______________________________________________________________________ >> >> If you received this email in error, please advise the sender (by return >> email or otherwise) immediately. You have consented to receive the attached >> electronically at the above-noted email address; please retain a copy of >> this confirmation for future reference. >> >> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur >> immédiatement, par retour de courriel ou par un autre moyen. Vous avez >> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à >> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de >> cette confirmation pour les fins de reference future. >> >> >> >> _______________________________________________________________________ >> >> If you received this email in error, please advise the sender (by return >> email or otherwise) immediately. You have consented to receive the attached >> electronically at the above-noted email address; please retain a copy of >> this confirmation for future reference. >> >> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur >> immédiatement, par retour de courriel ou par un autre moyen. Vous avez >> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à >> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de >> cette confirmation pour les fins de reference future. >> >> >> >> _______________________________________________________________________ >> >> If you received this email in error, please advise the sender (by return >> email or otherwise) immediately. You have consented to receive the attached >> electronically at the above-noted email address; please retain a copy of >> this confirmation for future reference. >> >> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur >> immédiatement, par retour de courriel ou par un autre moyen. Vous avez >> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à >> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de >> cette confirmation pour les fins de reference future. >> >> _______________________________________________________________________ >> >> This [email] may be privileged and/or confidential, and the sender does >> not waive any related rights and obligations. Any distribution, use or >> copying of this [email] or the information it contains by other than an >> intended recipient is unauthorized. If you received this [email] in error, >> please advise the sender (by return [email] or otherwise) immediately. You >> have consented to receive the attached electronically at the above-noted >> address; please retain a copy of this confirmation for future reference. >> >> >> >> _______________________________________________________________________ >> >> This [email] may be privileged and/or confidential, and the sender does >> not waive any related rights and obligations. Any distribution, use or >> copying of this [email] or the information it contains by other than an >> intended recipient is unauthorized. If you received this [email] in error, >> please advise the sender (by return [email] or otherwise) immediately. You >> have consented to receive the attached electronically at the above-noted >> address; please retain a copy of this confirmation for future reference. >> >> >> >> _______________________________________________________________________ >> >> This [email] may be privileged and/or confidential, and the sender does >> not waive any related rights and obligations. Any distribution, use or >> copying of this [email] or the information it contains by other than an >> intended recipient is unauthorized. If you received this [email] in error, >> please advise the sender (by return [email] or otherwise) immediately. You >> have consented to receive the attached electronically at the above-noted >> address; please retain a copy of this confirmation for future reference. >> >> >> >> _______________________________________________________________________ >> >> This [email] may be privileged and/or confidential, and the sender does >> not waive any related rights and obligations. Any distribution, use or >> copying of this [email] or the information it contains by other than an >> intended recipient is unauthorized. If you received this [email] in error, >> please advise the sender (by return [email] or otherwise) immediately. You >> have consented to receive the attached electronically at the above-noted >> address; please retain a copy of this confirmation for future reference. >> >> >