Hi Surya, Good to know the DB reads are working as expected.
Here's a list of operators you can use/refer for the next use-case, HDFS input - for reading multiple input files in parallel you can set partitionCount on the AbstractFileInputOperator for parallel reads.LineByLineFileInputOperator is a concrete implementation for reading one line at a time. xml parsing - there is a XmlParser in Malhar that takes in a xml string and emits a POJO. Combining multiple files into one - could you please give us a sense of the volume and the frequency of writes you expect so we can recommend something appropriate ? SFTP push - need to check on this one.Will revert. @Community, please feel free to chip in. Thanks, Dev On Fri, May 20, 2016 at 8:54 AM, Mukkamula, Suryavamshivardhan (CWM-NR) < suryavamshivardhan.mukkam...@rbc.com> wrote: > Hi Devendra, > > > > Thank you , It is working now and I also could read the properties from > xml file. I could also set the batch size and time gap for next database > hit. > > > > Now , my another requirement is to read 50 different files from HDFS , > parse them using xml mapping and sftp as a single file to a UNIX box. Can > you please suggest me the best practice like using parallel processing or > partitioning? > > > > Do you have any sample code for parallel processing or partitioning and > also how would I run the batch Job is there any batch scheduler that data > torrent provides? > > > > Regards, > > Surya Vamshi > > > > *From:* Devendra Tagare [mailto:devend...@datatorrent.com] > *Sent:* 2016, May, 18 4:19 PM > > *To:* users@apex.incubator.apache.org > *Subject:* Re: Information Needed > > > > Hi, > > > > Can you try something like this, > > > > JdbcPOJOInputOperator opr = dag.addOperator("JdbcPojo", new > JdbcPOJOInputOperator()); > > JdbcStore store = new JdbcStore(); > > opr.setStore(store); > > The properties would then be set on this store object. > > From the code snippet provided earlier, the store was not being set on the > JdbcInputOperator2 > > Thanks, > > Dev > > > > On Wed, May 18, 2016 at 12:50 PM, Mukkamula, Suryavamshivardhan (CWM-NR) < > suryavamshivardhan.mukkam...@rbc.com> wrote: > > Hi , > > > > Hello, > > > > When I have tried using below property as you suggested, my launch itself > is failing. When I don’t use store and directly assign > (‘dt.application.CountryNameScan.operator.<operatorName>.prop*.*databaseUrl’) > launch is successful but application run is failing with null pointer > exception since ‘store’ object is null. > > > > I see that in AbstractStoreInputOperator.java there is ‘store’ variable > and I am not clear how the value is set to it. > > > > Regards, > > Surya Vamshi > > > > *From:* Devendra Tagare [mailto:devend...@datatorrent.com] > *Sent:* 2016, May, 18 12:57 PM > > > *To:* users@apex.incubator.apache.org > *Subject:* Re: Information Needed > > > > Hi, > > > > The property on the store is not getting set since ".store." qualifier is > missing.Try the below for all store level properties. > > > > > > <property> > > <name>dt.application.CountryNameScan.operator.<operatorName>.prop > *.store.*databaseUrl</name> > > <value>{databaseUrl}</value> > > </property> > > > > Thanks, > > Dev > > > > On Wed, May 18, 2016 at 8:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) < > suryavamshivardhan.mukkam...@rbc.com> wrote: > > Hello, > > > > Thank you Shubam. > > > > I have tried using AbstractJdbcInputOperator. Below is the Operator and > the error that I am getting. My observation is ‘*store’* and ‘*context’* > objects are null. Please help to solve this issue. > > > > *Error Logs:* > > > > java.lang.NullPointerException > > at > com.datatorrent.lib.db.AbstractStoreInputOperator.setup(AbstractStoreInputOperator.java:77) > > at > com.rbc.aml.cnscan.operator.AbstractJdbcInputOperator.setup(AbstractJdbcInputOperator.java:99) > > at > com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:29) > > at > com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:13) > > at com.datatorrent.stram.engine.Node.setup(Node.java:161) > > at > com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1287) > > at > com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:92) > > at > com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1361) > > > > > > *Operator Class:* > > > > import java.sql.ResultSet; > > import java.sql.SQLException; > > > > import com.datatorrent.api.Context; > > import com.datatorrent.api.Context.OperatorContext; > > import com.datatorrent.api.DefaultOutputPort; > > import java.sql.PreparedStatement; > > import com.datatorrent.api.Operator; > > > > > > public class JdbcInputOperator2 extends AbstractJdbcInputOperator<Object> > > implements > Operator.ActivationListener<Context.OperatorContext> { > > > > private transient PreparedStatement preparedStatement; > > > > private String query; > > > > // @OutputPortFieldAnnotation(schemaRequired = true) > > public final transient DefaultOutputPort<Object> > outputPort = new DefaultOutputPort<Object>(); > > > > public JdbcInputOperator2() { > > super(); > > } > > > > @Override > > public void setup(Context.OperatorContext context) { > > super.setup(context); > > > > try { > > preparedStatement = > store.connection.prepareStatement(queryToRetrieveData()); > > System.out.println("store > value is"+store); > > } catch (Exception e) { > > > > } > > > > } > > > > @Override > > public Object getTuple(ResultSet result) { > > // TODO Auto-generated method stub > > StringBuilder sb = new StringBuilder(); > > try { > > System.out.println("result > set"+result); > > while (result.next()) { > > > sb.append(result.getString("CLNT_NO")); > > > sb.append(","); > > > sb.append(result.getString("TR_NO")); > > > System.out.println("tuple value"+sb.toString()); > > } > > } catch (SQLException e) { > > // TODO Auto-generated > catch block > > e.printStackTrace(); > > } > > > > return sb.toString(); > > } > > > > @Override > > public String queryToRetrieveData() { > > // TODO Auto-generated method stub > > return query; > > } > > > > @Override > > public void activate(OperatorContext arg0) { > > // TODO Auto-generated method stub > > > > } > > > > @Override > > public void deactivate() { > > // TODO Auto-generated method stub > > > > } > > > > public String getQuery() { > > return query; > > } > > > > public void setQuery(String query) { > > this.query = query; > > } > > > > } > > > > *Properties.xml* > > > > <property> > > > <name>dt.application.CountryNameScan.operator.<operatorName>.prop.databaseUrl</name> > > <value>{databaseUrl}</value> > > </property> > > <property> > > > <name>dt.application.CountryNameScan.operator.<operatorName>.prop.databaseDriver</name> > > <value>com.ibm.db2.jcc.DB2Driver</value> > > </property> > > <property> > > > <name>dt.application.CountryNameScan.operator.<operatorName>.prop.connectionProperties</name> > > <value>user:{uname},password:{psswrd}</value> > > </property> > > <property> > > > <name>dt.application.CountryNameScan.operator.<operatorName>.prop.query</name> > > <value> {Query}</value> > > </property> > > > > > > > > Regards, > > Surya Vamshi > > > > *From:* Shubham Pathak [mailto:shub...@datatorrent.com] > *Sent:* 2016, May, 18 1:51 AM > > > *To:* users@apex.incubator.apache.org > *Subject:* Re: Information Needed > > > > Hi Surya, > > > > If you are running into a Null Pointer exception in one of the operators > after the application is launched ( application in Running state ), you may > follow this link to view the logs > > http://docs.datatorrent.com/troubleshooting/#log-analysis > > > > However, if you are running into a Null Pointer exception before the app > is launched, this would be because DAG is failing during verification step > as some of the mandatory properties ( having @NotNull annotation ) might > not have been initialized. For JdbcPOJOInputOperator, there are 3 such > properties databaseUrl, databaseDriver and List<FieldInfo>. Could you > verify if these have been set ? > > > > Thanks, > > Shubham > > > > On Wed, May 18, 2016 at 3:54 AM, Mukkamula, Suryavamshivardhan (CWM-NR) < > suryavamshivardhan.mukkam...@rbc.com> wrote: > > Hello Aswin/Ram, > > > > I am trying to use the JdbcPOJOInputOperator operators from Malhar JDBC, > when I launch the application I am running into Null pointer exception. I > am passing the DB connection details from properties XML. > > > > May I know where to look for the launch logs , I am running the DT > application on my sandbox. > > > > > > Regards, > > Surya Vamshi > > > > *From:* Mukkamula, Suryavamshivardhan (CWM-NR) [mailto: > suryavamshivardhan.mukkam...@rbc.com] > *Sent:* 2016, May, 17 10:59 AM > *To:* users@apex.incubator.apache.org > *Subject:* RE: Information Needed > > > > Hi, > > > > Thank you for the inputs Ram. Understood how the properties are read. > > > > Regards, > > Surya Vamshi > > > > *From:* Munagala Ramanath [mailto:r...@datatorrent.com > <r...@datatorrent.com>] > *Sent:* 2016, May, 17 10:53 AM > *To:* users@apex.incubator.apache.org > *Subject:* Re: Information Needed > > > > Declare fields in your operators with appropriate setters and getters, for > example: > > > > *private String foo;* > > *public void setFoo(String v) {foo = v;}* > > *public String getFoo(){ return foo;}* > > > > Then in the properties file, include a stanza like this: > > * <property>* > > * <name>dt.application.{appName}.operator.{opName}.prop.foo</name>* > > * <value>Hello World</value>* > > * </property>* > > > > where {appName} is the name of your application provided in the parameter > of > > *@ApplicationAnnotation*, {opName} is the operator name given in the > addOperator() call. > > > > The platform will automatically inject this value into your operator so > that it is ready to use in > > all of the operator callbacks like setup(), beginWindow(), endWindow(), > emitTuples(), etc. > > > > You can also use "*" wildcards for application and operator names. In such > cases all operators > > that have the named field will have the value injected. > > > > In *g...@github.com:datatorrent/examples > <g...@github.com:datatorrent/examples>* there are several sample > applications. For example, > > the fileIO application has *src/main/resources/META-INF/properties.xml* > which defines several > > properties for operators. > > > > You can also use annotations like @NotNull on your fields to ensure that > values have been > > injected; if for some reason the value was not set (e.g. typo in the > property name in the > > properties file), the application will fail validation and will not even > launch. > > > > Ram > > > > On Tue, May 17, 2016 at 7:24 AM, Mukkamula, Suryavamshivardhan (CWM-NR) < > suryavamshivardhan.mukkam...@rbc.com> wrote: > > Hi Ram, > > > > How would I read the custom properties in my operators? , are these > properties available to all the operators which run on different nodes of > the cluster? > > > > Regards, > > Surya Vamshi > > > > *From:* Munagala Ramanath [mailto:r...@datatorrent.com] > *Sent:* 2016, May, 17 9:58 AM > *To:* users@apex.incubator.apache.org > *Subject:* Re: Information Needed > > > > You can add multiple XML configuration files at *src/site/conf* in your > project and select one of them at launch time. > > This is described briefly at > http://docs.datatorrent.com/application_packages/ under the section > entitled > > *Adding pre-set configurations* > > > > Ram > > > > On Tue, May 17, 2016 at 6:43 AM, Mukkamula, Suryavamshivardhan (CWM-NR) < > suryavamshivardhan.mukkam...@rbc.com> wrote: > > Hi Ashwin, > > > > Thank you. > > > > I see that result set is read once and tuples are emitted tuple by tuple. > I will explore these JDBC operators. > > > > One more requirement, I would like to read custom properties-JDBC and set > my dbconnection and query in the properties itself, do you have sample code > to read custom properties from the class path. > > > > Regards, > > Surya Vamshi > > > > *From:* Ashwin Chandra Putta [mailto:ashwinchand...@gmail.com] > *Sent:* 2016, May, 16 5:21 PM > *To:* users@apex.incubator.apache.org > *Subject:* Re: Information Needed > > > > Surya, > > > > You can use the jdbc input and output operators to fetch from origin > database and destination database. > > > > Check > https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/db/jdbc > > > > They should have configurable batch sizes. In general batching is faster > than tuple by tuple. > > > > Regards, > > Ashwin. > > > > On Mon, May 16, 2016 at 1:20 PM, Mukkamula, Suryavamshivardhan (CWM-NR) < > suryavamshivardhan.mukkam...@rbc.com> wrote: > > Hi Ashwin, > > > > It is to get all the 10k rows as a single batch and process them and > insert into destination database. > > > > Which one is suggested , to get single row from database or batch of 10k > rows and process and then insert into destination database? > > > > Regards, > > Surya Vamshi > > > > *From:* Ashwin Chandra Putta [mailto:ashwinchand...@gmail.com] > *Sent:* 2016, May, 16 1:50 PM > *To:* users@apex.incubator.apache.org > *Subject:* Re: Information Needed > > > > Surya, > > > > By single row tuple, are you looking for a way to enrich/join your tuples > with a single tuple from another table? Or simply get all rows from the > origin database table as a single batch of 10000 tuples? > > > > Regards, > > Ashwin. > > > > On Mon, May 16, 2016 at 8:54 AM, Mukkamula, Suryavamshivardhan (CWM-NR) < > suryavamshivardhan.mukkam...@rbc.com> wrote: > > Hello Team, > > > > Use Case : Batch ingestion of transaction data (rows = 10000) from DB2 to > Vertica database. > > > > Question : How Can I make DB2 Connection in such a way that I get a single > row tuple only ? Do you have any sample code which reads from one database > and writes into another database ? > > > > Regards, > > Surya Vamshi > > > > _______________________________________________________________________ > > This [email] may be privileged and/or confidential, and the sender does > not waive any related rights and obligations. Any distribution, use or > copying of this [email] or the information it contains by other than an > intended recipient is unauthorized. If you received this [email] in error, > please advise the sender (by return [email] or otherwise) immediately. You > have consented to receive the attached electronically at the above-noted > address; please retain a copy of this confirmation for future reference. > > > > > > -- > > > > Regards, > > Ashwin. > > _______________________________________________________________________ > > This [email] may be privileged and/or confidential, and the sender does > not waive any related rights and obligations. Any distribution, use or > copying of this [email] or the information it contains by other than an > intended recipient is unauthorized. If you received this [email] in error, > please advise the sender (by return [email] or otherwise) immediately. You > have consented to receive the attached electronically at the above-noted > address; please retain a copy of this confirmation for future reference. > > > > > > -- > > > > Regards, > > Ashwin. > > _______________________________________________________________________ > > If you received this email in error, please advise the sender (by return > email or otherwise) immediately. You have consented to receive the attached > electronically at the above-noted email address; please retain a copy of > this confirmation for future reference. > > Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur > immédiatement, par retour de courriel ou par un autre moyen. Vous avez > accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à > l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de > cette confirmation pour les fins de reference future. > > > > _______________________________________________________________________ > > If you received this email in error, please advise the sender (by return > email or otherwise) immediately. You have consented to receive the attached > electronically at the above-noted email address; please retain a copy of > this confirmation for future reference. > > Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur > immédiatement, par retour de courriel ou par un autre moyen. Vous avez > accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à > l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de > cette confirmation pour les fins de reference future. > > > > _______________________________________________________________________ > > If you received this email in error, please advise the sender (by return > email or otherwise) immediately. You have consented to receive the attached > electronically at the above-noted email address; please retain a copy of > this confirmation for future reference. > > Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur > immédiatement, par retour de courriel ou par un autre moyen. Vous avez > accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à > l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de > cette confirmation pour les fins de reference future. > > _______________________________________________________________________ > > This [email] may be privileged and/or confidential, and the sender does > not waive any related rights and obligations. Any distribution, use or > copying of this [email] or the information it contains by other than an > intended recipient is unauthorized. If you received this [email] in error, > please advise the sender (by return [email] or otherwise) immediately. You > have consented to receive the attached electronically at the above-noted > address; please retain a copy of this confirmation for future reference. > > > > _______________________________________________________________________ > > This [email] may be privileged and/or confidential, and the sender does > not waive any related rights and obligations. Any distribution, use or > copying of this [email] or the information it contains by other than an > intended recipient is unauthorized. If you received this [email] in error, > please advise the sender (by return [email] or otherwise) immediately. You > have consented to receive the attached electronically at the above-noted > address; please retain a copy of this confirmation for future reference. > > > > _______________________________________________________________________ > > This [email] may be privileged and/or confidential, and the sender does > not waive any related rights and obligations. Any distribution, use or > copying of this [email] or the information it contains by other than an > intended recipient is unauthorized. If you received this [email] in error, > please advise the sender (by return [email] or otherwise) immediately. You > have consented to receive the attached electronically at the above-noted > address; please retain a copy of this confirmation for future reference. > > > > _______________________________________________________________________ > > This [email] may be privileged and/or confidential, and the sender does > not waive any related rights and obligations. Any distribution, use or > copying of this [email] or the information it contains by other than an > intended recipient is unauthorized. If you received this [email] in error, > please advise the sender (by return [email] or otherwise) immediately. You > have consented to receive the attached electronically at the above-noted > address; please retain a copy of this confirmation for future reference. > >